mirror of
https://github.com/go-gitea/gitea.git
synced 2026-05-25 16:08:46 +09:00
feat: execute post run cleanup when workflow is cancelled (#37275)
## Fixes #36983 ## Summary 1. Add transitional `Cancelling` status (between `Running` and `Cancelled`); cancel flow marks active tasks `Cancelling`, runner finalizes to `Cancelled` on terminal result. 2. Taskless jobs cancel directly (no runner to finalize). 3. Runner-protocol responses map `Cancelling` → `RESULT_CANCELLED`. 4. Run/job aggregation treats `Cancelling` as active. 5. Status mapping/aggregation tests + en-US locale added. **Problem** When a workflow was cancelled from the UI, jobs were marked cancelled immediately, which could skip post-run cleanup behavior. ## Solution Use a transitional status path: Running → Cancelling → Cancelled This allows runner finalization and cleanup path execution before final terminal state. **Testing** > 1. go test -tags "sqlite sqlite_unlock_notify" ./models/actions -run "TestAggregateJobStatus|TestStatusAsResult|TestStatusFromResult" > 2. go run github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.11.4 run ./models/actions/... ./routers/api/actions/runner/... ## Related - act_runner: https://gitea.com/gitea/act_runner/pulls/825 — independent; this PR's capability gate keeps legacy runners on the immediate-cancel path. The new flow activates only for runners that advertise the `cancelling` capability. Co-authored-by: Nicolas <bircni@icloud.com> Co-authored-by: silverwind <me@silverwind.io> Co-authored-by: Claude (Opus 4.7) <noreply@anthropic.com> Co-authored-by: Zettat123 <zettat123@gmail.com> Co-authored-by: Giteabot <teabot@gitea.io>
This commit is contained in:
committed by
GitHub
parent
ae9b34897f
commit
e7af84df72
@@ -144,7 +144,7 @@ func CleanupEphemeralRunners(ctx context.Context) error {
|
||||
From(builder.Select("*").From("`action_runner`"), "`action_runner`"). // mysql needs this redundant subquery
|
||||
Join("INNER", "`action_task`", "`action_task`.`runner_id` = `action_runner`.`id`").
|
||||
Where(builder.Eq{"`action_runner`.`ephemeral`": true}).
|
||||
And(builder.NotIn("`action_task`.`status`", actions_model.StatusWaiting, actions_model.StatusRunning, actions_model.StatusBlocked))
|
||||
And(builder.NotIn("`action_task`.`status`", actions_model.StatusWaiting, actions_model.StatusRunning, actions_model.StatusBlocked, actions_model.StatusCancelling))
|
||||
b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`")
|
||||
res, err := db.GetEngine(ctx).Exec(b)
|
||||
if err != nil {
|
||||
|
||||
@@ -19,20 +19,30 @@ import (
|
||||
webhook_module "code.gitea.io/gitea/modules/webhook"
|
||||
)
|
||||
|
||||
// StopZombieTasks stops the task which have running status, but haven't been updated for a long time
|
||||
// StopZombieTasks stops tasks in running/cancelling status that haven't been updated for a long time
|
||||
func StopZombieTasks(ctx context.Context) error {
|
||||
return stopTasks(ctx, actions_model.FindTaskOptions{
|
||||
Status: actions_model.StatusRunning,
|
||||
return stopTasksByStatuses(ctx, actions_model.FindTaskOptions{
|
||||
UpdatedBefore: timeutil.TimeStamp(time.Now().Add(-setting.Actions.ZombieTaskTimeout).Unix()),
|
||||
})
|
||||
}, actions_model.StatusRunning, actions_model.StatusCancelling)
|
||||
}
|
||||
|
||||
// StopEndlessTasks stops the tasks which have running status and continuous updates, but don't end for a long time
|
||||
// StopEndlessTasks stops tasks in running/cancelling status with continuous updates that don't end for a long time
|
||||
func StopEndlessTasks(ctx context.Context) error {
|
||||
return stopTasks(ctx, actions_model.FindTaskOptions{
|
||||
Status: actions_model.StatusRunning,
|
||||
return stopTasksByStatuses(ctx, actions_model.FindTaskOptions{
|
||||
StartedBefore: timeutil.TimeStamp(time.Now().Add(-setting.Actions.EndlessTaskTimeout).Unix()),
|
||||
})
|
||||
}, actions_model.StatusRunning, actions_model.StatusCancelling)
|
||||
}
|
||||
|
||||
func stopTasksByStatuses(ctx context.Context, opts actions_model.FindTaskOptions, statuses ...actions_model.Status) error {
|
||||
for _, status := range statuses {
|
||||
optsByStatus := opts
|
||||
optsByStatus.Status = status
|
||||
if err := stopTasks(ctx, optsByStatus); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
|
||||
@@ -59,7 +69,7 @@ func shouldBlockJobByConcurrency(ctx context.Context, job *actions_model.ActionR
|
||||
return false, nil
|
||||
}
|
||||
|
||||
attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning})
|
||||
attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning, actions_model.StatusCancelling})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("GetConcurrentRunAttemptsAndJobs: %w", err)
|
||||
}
|
||||
@@ -89,7 +99,7 @@ func shouldBlockRunByConcurrency(ctx context.Context, attempt *actions_model.Act
|
||||
return false, nil
|
||||
}
|
||||
|
||||
attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning})
|
||||
attempts, jobs, err := actions_model.GetConcurrentRunAttemptsAndJobs(ctx, attempt.RepoID, attempt.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning, actions_model.StatusCancelling})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("find concurrent runs and jobs: %w", err)
|
||||
}
|
||||
@@ -123,7 +133,11 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
|
||||
jobs := make([]*actions_model.ActionRunJob, 0, len(tasks))
|
||||
for _, task := range tasks {
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
if err := actions_model.StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
|
||||
stopStatus := actions_model.StatusFailure
|
||||
if task.Status == actions_model.StatusCancelling {
|
||||
stopStatus = actions_model.StatusCancelled
|
||||
}
|
||||
if err := actions_model.StopTask(ctx, task.ID, stopStatus); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := task.LoadJob(ctx); err != nil {
|
||||
@@ -157,44 +171,18 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error {
|
||||
|
||||
// CancelAbandonedJobs cancels jobs that have not been picked by any runner for a long time
|
||||
func CancelAbandonedJobs(ctx context.Context) error {
|
||||
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
|
||||
abandonedJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
|
||||
Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked},
|
||||
UpdatedBefore: timeutil.TimeStampNow().AddDuration(-setting.Actions.AbandonedJobTimeout),
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("find abandoned tasks: %v", err)
|
||||
log.Warn("find abandoned jobs: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
now := timeutil.TimeStampNow()
|
||||
|
||||
updatedJobs := []*actions_model.ActionRunJob{}
|
||||
|
||||
for _, job := range jobs {
|
||||
job.Status = actions_model.StatusCancelled
|
||||
job.Stopped = now
|
||||
updated := false
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "stopped")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := job.LoadAttributes(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
updated = n > 0
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Warn("cancel abandoned job %v: %v", job.ID, err)
|
||||
// go on
|
||||
}
|
||||
if job.Run == nil || job.Run.Repo == nil {
|
||||
continue // error occurs during loading attributes, the following code that depends on "Run.Repo" will fail, so ignore and skip
|
||||
}
|
||||
if updated {
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
}
|
||||
updatedJobs, err := actions_model.CancelJobs(ctx, abandonedJobs)
|
||||
if err != nil {
|
||||
log.Warn("cancel abandoned jobs: %v", err)
|
||||
}
|
||||
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, updatedJobs)
|
||||
|
||||
90
services/actions/clear_tasks_test.go
Normal file
90
services/actions/clear_tasks_test.go
Normal file
@@ -0,0 +1,90 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
"code.gitea.io/gitea/models/unittest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func createConflictingCancellingJob(t *testing.T, concurrencyGroup string, runIndex int64) *actions_model.ActionRunJob {
|
||||
t.Helper()
|
||||
|
||||
run := &actions_model.ActionRun{
|
||||
RepoID: 1,
|
||||
OwnerID: 2,
|
||||
TriggerUserID: 2,
|
||||
WorkflowID: "test.yml",
|
||||
Index: runIndex,
|
||||
Ref: "refs/heads/main",
|
||||
Status: actions_model.StatusBlocked,
|
||||
}
|
||||
require.NoError(t, db.Insert(t.Context(), run))
|
||||
|
||||
attempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: run.RepoID,
|
||||
RunID: run.ID,
|
||||
Attempt: 1,
|
||||
TriggerUserID: run.TriggerUserID,
|
||||
Status: actions_model.StatusBlocked,
|
||||
ConcurrencyGroup: concurrencyGroup,
|
||||
}
|
||||
require.NoError(t, db.Insert(t.Context(), attempt))
|
||||
|
||||
job := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RunAttemptID: attempt.ID,
|
||||
AttemptJobID: 1,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0",
|
||||
Name: "conflicting-cancelling-job",
|
||||
JobID: "conflicting-cancelling-job",
|
||||
Status: actions_model.StatusCancelling,
|
||||
ConcurrencyGroup: concurrencyGroup,
|
||||
}
|
||||
require.NoError(t, db.Insert(t.Context(), job))
|
||||
|
||||
return job
|
||||
}
|
||||
|
||||
func TestShouldBlockJobByConcurrency_CancellingJobBlocks(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
const concurrencyGroup = "test-cancelling-job-blocks"
|
||||
createConflictingCancellingJob(t, concurrencyGroup, 9903)
|
||||
|
||||
job := &actions_model.ActionRunJob{
|
||||
RepoID: 1,
|
||||
RawConcurrency: concurrencyGroup,
|
||||
IsConcurrencyEvaluated: true,
|
||||
ConcurrencyGroup: concurrencyGroup,
|
||||
}
|
||||
|
||||
shouldBlock, err := shouldBlockJobByConcurrency(t.Context(), job)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, shouldBlock)
|
||||
}
|
||||
|
||||
func TestShouldBlockRunByConcurrency_CancellingJobBlocks(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
const concurrencyGroup = "test-cancelling-run-blocks"
|
||||
createConflictingCancellingJob(t, concurrencyGroup, 9904)
|
||||
|
||||
attempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: 1,
|
||||
ConcurrencyGroup: concurrencyGroup,
|
||||
}
|
||||
|
||||
shouldBlock, err := shouldBlockRunByConcurrency(t.Context(), attempt)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, shouldBlock)
|
||||
}
|
||||
@@ -181,11 +181,13 @@ func toCommitStatusDescription(job *actions_model.ActionRunJob) string {
|
||||
case actions_model.StatusFailure:
|
||||
return fmt.Sprintf("Failing after %s", job.Duration())
|
||||
case actions_model.StatusCancelled:
|
||||
return fmt.Sprintf("Cancelled after %s", job.Duration())
|
||||
return fmt.Sprintf("Canceled after %s", job.Duration())
|
||||
case actions_model.StatusSkipped:
|
||||
return "Skipped"
|
||||
case actions_model.StatusRunning:
|
||||
return "In progress"
|
||||
case actions_model.StatusCancelling:
|
||||
return "Canceling"
|
||||
case actions_model.StatusWaiting:
|
||||
return "Waiting to run"
|
||||
case actions_model.StatusBlocked:
|
||||
@@ -201,7 +203,7 @@ func toCommitStatus(status actions_model.Status) commitstatus.CommitStatusState
|
||||
return commitstatus.CommitStatusSuccess
|
||||
case actions_model.StatusFailure, actions_model.StatusCancelled:
|
||||
return commitstatus.CommitStatusFailure
|
||||
case actions_model.StatusWaiting, actions_model.StatusBlocked, actions_model.StatusRunning:
|
||||
case actions_model.StatusWaiting, actions_model.StatusBlocked, actions_model.StatusRunning, actions_model.StatusCancelling:
|
||||
return commitstatus.CommitStatusPending
|
||||
case actions_model.StatusSkipped:
|
||||
return commitstatus.CommitStatusSkipped
|
||||
|
||||
@@ -28,7 +28,8 @@ func TestCommitStatusDescription(t *testing.T) {
|
||||
}{
|
||||
{actions_model.StatusSuccess, 100, 102, "Successful in 2s"},
|
||||
{actions_model.StatusFailure, 100, 130, "Failing after 30s"},
|
||||
{actions_model.StatusCancelled, 100, 145, "Cancelled after 45s"},
|
||||
{actions_model.StatusCancelled, 100, 145, "Canceled after 45s"},
|
||||
{actions_model.StatusCancelling, 0, 0, "Canceling"},
|
||||
{actions_model.StatusSkipped, 0, 0, "Skipped"},
|
||||
{actions_model.StatusRunning, 0, 0, "In progress"},
|
||||
{actions_model.StatusWaiting, 0, 0, "Waiting to run"},
|
||||
|
||||
@@ -18,8 +18,9 @@ func NotifyWorkflowJobsAndRunsStatusUpdate(ctx context.Context, jobs []*actions_
|
||||
return
|
||||
}
|
||||
|
||||
// The input jobs may belong to different runs, so track each affected run.
|
||||
runs := make(map[int64]*actions_model.ActionRun, len(jobs))
|
||||
// The input jobs may belong to different runs, so track each affected run ID
|
||||
// and reload it later to avoid notifying with stale aggregate status.
|
||||
runRepoIDs := make(map[int64]int64, len(jobs))
|
||||
jobsByRunID := make(map[int64][]*actions_model.ActionRunJob)
|
||||
|
||||
for _, job := range jobs {
|
||||
@@ -29,17 +30,15 @@ func NotifyWorkflowJobsAndRunsStatusUpdate(ctx context.Context, jobs []*actions_
|
||||
}
|
||||
CreateCommitStatusForRunJobs(ctx, job.Run, job)
|
||||
|
||||
if _, ok := runs[job.RunID]; !ok {
|
||||
runs[job.RunID] = job.Run
|
||||
}
|
||||
runRepoIDs[job.RunID] = job.RepoID
|
||||
if _, ok := jobsByRunID[job.RunID]; !ok {
|
||||
jobsByRunID[job.RunID] = make([]*actions_model.ActionRunJob, 0)
|
||||
}
|
||||
jobsByRunID[job.RunID] = append(jobsByRunID[job.RunID], job)
|
||||
}
|
||||
|
||||
for _, run := range runs {
|
||||
NotifyWorkflowRunStatusUpdate(ctx, run)
|
||||
for runID, repoID := range runRepoIDs {
|
||||
NotifyWorkflowRunStatusUpdateWithReload(ctx, repoID, runID)
|
||||
}
|
||||
|
||||
for _, jobs := range jobsByRunID {
|
||||
|
||||
@@ -35,7 +35,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
|
||||
return nil, false, err
|
||||
}
|
||||
if has {
|
||||
if task.Status == actions_model.StatusWaiting || task.Status == actions_model.StatusRunning || task.Status == actions_model.StatusBlocked {
|
||||
if task.Status.In(actions_model.StatusWaiting, actions_model.StatusRunning, actions_model.StatusBlocked, actions_model.StatusCancelling) {
|
||||
return nil, false, nil
|
||||
}
|
||||
// task has been finished, remove it
|
||||
|
||||
Reference in New Issue
Block a user