mirror of
https://github.com/go-gitea/gitea.git
synced 2026-05-06 04:01:05 +09:00
## Problem
Workflow-level concurrency groups were evaluated — and jobs were parsed
— before the run was persisted, so `run.ID` was `0` and `github.run_id`
in the expression context resolved to an empty string. Expressions like:
```yaml
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
```
collapsed to `<workflow>-` on every push event (`head_ref` is empty on
push), so `cancel-in-progress` cancelled in-progress runs across
**unrelated branches**, not just the current one.
Reproduced on a 1.26 instance:
- push to `master` → `ci` run starts
- push to `feature-branch` → the `master` run gets cancelled
GitHub Actions' documented semantic: on push events `github.run_id` is
unique per run, so the group is unique → no cancellation; on PR events
`github.head_ref` is the source branch → cancellation is per-PR.
## Fix
Insert the run **before** parsing jobs or evaluating workflow-level
concurrency, so `run.ID` is populated in time for every expression that
reads `github.run_id` — not just the concurrency group, but also
`run-name`, job names, and `runs-on`.
`jobparser.Parse` now runs inside the `InsertRun` transaction, after
`db.Insert(ctx, run)`. Workflow-level concurrency evaluation runs next
and only mutates `run` in memory. All concurrency-derived fields
(`raw_concurrency`, `concurrency_group`, `concurrency_cancel`) plus
`status` and `title` are persisted in a single final `UpdateRun` at
end-of-transaction — one `INSERT` + one `UPDATE` per run in both the
concurrency and non-concurrency paths (matches pre-branch parity, one
fewer `UpdateRepoRunsNumbers` `COUNT` than the interim state).
`GenerateGiteaContext` now sets `run_id` from `run.ID` unconditionally;
every caller passes a persisted run.
**Verification**: tested end-to-end on a 1.26 deployment. Before the
patch, two successive `ci` pushes (one to master, one to a feature
branch) cross-cancelled each other. After the patch, the same pushes —
in both orders (master→branch, branch→master) — run to completion
simultaneously across 15+ runs with zero cancellations.
**Regression tests** in `services/actions/context_test.go`:
- `TestEvaluateRunConcurrency_RunIDFallback` — unit check that
`EvaluateRunConcurrencyFillModel` resolves `github.run_id` from
`run.ID`.
- `TestPrepareRunAndInsert_ExpressionsSeeRunID` — full-flow check: calls
`PrepareRunAndInsert` with `${{ github.run_id }}` in both `run-name` and
the concurrency group, then asserts the persisted `Title`,
`ConcurrencyGroup`, and `RawConcurrency` contain / survive the run's ID.
Re-ordering `db.Insert` relative to either parse or concurrency eval
fails this test.
## Relation to #37119
[#37119](https://github.com/go-gitea/gitea/pull/37119) also moves
concurrency evaluation into `InsertRun` but keeps it **before**
`db.Insert`, then tries to populate `run_id` only when `run.ID > 0` —
which is still `0` at that call site, so the cross-branch leak would
survive that PR as written. This PR fixes the ordering so that `run.ID`
is actually populated at eval time, and broadens it to cover parse-time
expression interpolation too.
Co-authored-by: Claude (Opus 4.7) <noreply@anthropic.com>
188 lines
6.4 KiB
Go
188 lines
6.4 KiB
Go
// Copyright 2025 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
actions_model "code.gitea.io/gitea/models/actions"
|
|
"code.gitea.io/gitea/models/db"
|
|
"code.gitea.io/gitea/modules/actions/jobparser"
|
|
"code.gitea.io/gitea/modules/util"
|
|
notify_service "code.gitea.io/gitea/services/notify"
|
|
|
|
act_model "github.com/nektos/act/pkg/model"
|
|
"go.yaml.in/yaml/v4"
|
|
)
|
|
|
|
// PrepareRunAndInsert prepares a run and inserts it into the database
|
|
// It parses the workflow content, evaluates concurrency if needed, and inserts the run and its jobs into the database.
|
|
// The title will be cut off at 255 characters if it's longer than 255 characters.
|
|
func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model.ActionRun, inputsWithDefaults map[string]any) error {
|
|
if err := run.LoadAttributes(ctx); err != nil {
|
|
return fmt.Errorf("LoadAttributes: %w", err)
|
|
}
|
|
|
|
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
|
if err != nil {
|
|
return fmt.Errorf("GetVariablesOfRun: %w", err)
|
|
}
|
|
|
|
wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(content)
|
|
if err != nil {
|
|
return fmt.Errorf("ReadWorkflowRawConcurrency: %w", err)
|
|
}
|
|
|
|
if err = InsertRun(ctx, run, content, vars, inputsWithDefaults, wfRawConcurrency); err != nil {
|
|
return fmt.Errorf("InsertRun: %w", err)
|
|
}
|
|
|
|
// Load the newly inserted jobs with all fields from database (the job models in InsertRun are partial, so load again)
|
|
allJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
|
if err != nil {
|
|
return fmt.Errorf("FindRunJob: %w", err)
|
|
}
|
|
|
|
CreateCommitStatusForRunJobs(ctx, run, allJobs...)
|
|
|
|
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
|
|
for _, job := range allJobs {
|
|
notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InsertRun inserts a run
|
|
// The title will be cut off at 255 characters if it's longer than 255 characters.
|
|
func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte, vars map[string]string, inputs map[string]any, wfRawConcurrency *act_model.RawConcurrency) error {
|
|
return db.WithTx(ctx, func(ctx context.Context) error {
|
|
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
run.Index = index
|
|
run.Title = util.EllipsisDisplayString(run.Title, 255)
|
|
run.Status = actions_model.StatusWaiting
|
|
|
|
// Insert before parsing jobs or evaluating workflow-level concurrency
|
|
// so that run.ID is populated. Expressions referencing github.run_id —
|
|
// in run-name, job names, runs-on, or a workflow-level concurrency
|
|
// group like `${{ github.head_ref || github.run_id }}` — would otherwise
|
|
// interpolate to an empty string.
|
|
if err := db.Insert(ctx, run); err != nil {
|
|
return err
|
|
}
|
|
|
|
giteaCtx := GenerateGiteaContext(run, nil)
|
|
jobs, err := jobparser.Parse(content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()), jobparser.WithInputs(inputs))
|
|
if err != nil {
|
|
return fmt.Errorf("parse workflow: %w", err)
|
|
}
|
|
|
|
titleChanged := len(jobs) > 0 && jobs[0].RunName != ""
|
|
if titleChanged {
|
|
run.Title = util.EllipsisDisplayString(jobs[0].RunName, 255)
|
|
}
|
|
|
|
if wfRawConcurrency != nil {
|
|
if err := EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars, inputs); err != nil {
|
|
return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err)
|
|
}
|
|
run.Status, err = PrepareToStartRunWithConcurrency(ctx, run)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
|
|
var hasWaitingJobs bool
|
|
|
|
for _, v := range jobs {
|
|
id, job := v.Job()
|
|
needs := job.Needs()
|
|
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
|
|
return err
|
|
}
|
|
payload, _ := v.Marshal()
|
|
|
|
shouldBlockJob := len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked
|
|
|
|
job.Name = util.EllipsisDisplayString(job.Name, 255)
|
|
runJob := &actions_model.ActionRunJob{
|
|
RunID: run.ID,
|
|
RepoID: run.RepoID,
|
|
OwnerID: run.OwnerID,
|
|
CommitSHA: run.CommitSHA,
|
|
IsForkPullRequest: run.IsForkPullRequest,
|
|
Name: job.Name,
|
|
WorkflowPayload: payload,
|
|
JobID: id,
|
|
Needs: needs,
|
|
RunsOn: job.RunsOn(),
|
|
Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting),
|
|
}
|
|
// Parse workflow/job permissions (no clamping here)
|
|
if perms := ExtractJobPermissionsFromWorkflow(v, job); perms != nil {
|
|
runJob.TokenPermissions = perms
|
|
}
|
|
|
|
// check job concurrency
|
|
if job.RawConcurrency != nil {
|
|
rawConcurrency, err := yaml.Marshal(job.RawConcurrency)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal raw concurrency: %w", err)
|
|
}
|
|
runJob.RawConcurrency = string(rawConcurrency)
|
|
|
|
// do not evaluate job concurrency when it requires `needs`, the jobs with `needs` will be evaluated later by job emitter
|
|
if len(needs) == 0 {
|
|
err = EvaluateJobConcurrencyFillModel(ctx, run, runJob, vars, inputs)
|
|
if err != nil {
|
|
return fmt.Errorf("evaluate job concurrency: %w", err)
|
|
}
|
|
}
|
|
|
|
// If a job needs other jobs ("needs" is not empty), its status is set to StatusBlocked at the entry of the loop
|
|
// No need to check job concurrency for a blocked job (it will be checked by job emitter later)
|
|
if runJob.Status == actions_model.StatusWaiting {
|
|
runJob.Status, err = PrepareToStartJobWithConcurrency(ctx, runJob)
|
|
if err != nil {
|
|
return fmt.Errorf("prepare to start job with concurrency: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting
|
|
if err := db.Insert(ctx, runJob); err != nil {
|
|
return err
|
|
}
|
|
|
|
runJobs = append(runJobs, runJob)
|
|
}
|
|
|
|
run.Status = actions_model.AggregateJobStatus(runJobs)
|
|
cols := []string{"status"}
|
|
if titleChanged {
|
|
cols = append(cols, "title")
|
|
}
|
|
if wfRawConcurrency != nil {
|
|
cols = append(cols, "raw_concurrency", "concurrency_group", "concurrency_cancel")
|
|
}
|
|
if err := actions_model.UpdateRun(ctx, run, cols...); err != nil {
|
|
return err
|
|
}
|
|
|
|
// if there is a job in the waiting status, increase tasks version.
|
|
if hasWaitingJobs {
|
|
if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|