mirror of
https://github.com/go-gitea/gitea.git
synced 2026-04-28 20:34:30 +09:00
Follow up #36842 Migration `326` can be prohibitively slow on large instances because it scans and rewrites all commit status target URLs generated by Gitea Actions in the database. This PR refactors migration `326` to perform a partial update instead of rewriting every legacy target URL. The reason for this partial rewrite is that **smaller legacy run/job indexes are the most likely to be ambiguous with run/job ID-based URLs** during runtime resolution, so this change prioritizes that subset while avoiding the cost of rewriting all legacy records. To preserve access to old links, this PR introduces `resolveCurrentRunForView` to handle both ID-based URLs and index-based URLs: - For job pages (`/actions/runs/{run}/jobs/{job}`), it first tries to confirm that the URL is ID-based. It does so by checking whether `{job}` can be treated as an existing job ID in the repository and whether that job belongs to `{run}`. If that match cannot be confirmed, it falls back to treating the URL as legacy `run index + job index`, resolves the corresponding run and job, and redirects to the correct ID-based URL. - When both ID-based and index-based interpretations are valid at the same time, the resolver **prefers the ID-based interpretation by default**. For example, if a repository contains one run-job pair (`run_id=3, run_index=2, job_id=4`), and also another run-job pair (`run_id=1100, run_index=3, job_id=1200, job_index=4`), then `/actions/runs/3/jobs/4` is ambiguous. In that case, the resolver treats it as the ID-based URL by default and shows the page for `run_id=3, job_id=4`. Users can still explicitly force the legacy index-based interpretation with `?by_index=1`, which would resolve the same URL to `/actions/runs/1100/jobs/1200`. - For run summary pages (`/actions/runs/{run}`), it uses a best-effort strategy: by default it first treats `{run}` as a run ID, and if no such run exists in the repository, it falls back to treating `{run}` as a legacy run index and redirects to the ID-based URL. Users can also explicitly force the legacy interpretation with `?by_index=1`. - This summary-page compatibility is best-effort, not a strict ambiguity check. For example, if a repository contains two runs: runA (`id=7, index=3`) and runB (`id=99, index=7`), then `/actions/runs/7` will resolve to runA by default, even though the old index-based URL originally referred to runB. The table below shows how valid legacy index-based target URLs are handled before and after migration `326`. Lower-range legacy URLs are rewritten to ID-based URLs, while higher-range legacy URLs remain unchanged in the database but are still handled correctly by `resolveCurrentRunForView` at runtime. | run_id | run_index | job_id | job_index | old target URL | updated by migration 326 | current target URL | can be resolved correctly | |---|---|---|---|---|---|---|---| | 3 | 2 | 4 | 1 | `/user2/repo2/actions/runs/2/jobs/1` | true | `/user2/repo2/actions/runs/3/jobs/4` | true | | 4 | 3 | 8 | 4 | `/user2/repo2/actions/runs/3/jobs/4` | true | `/user2/repo2/actions/runs/4/jobs/8` | true (without migration 326, this URL will resolve to run(`id=3`)) | | 80 | 20 | 170 | 0 | `/user2/repo2/actions/runs/20/jobs/0` | true | `/user2/repo2/actions/runs/80/jobs/170` | true | | 1500 | 900 | 1600 | 0 | `/user2/repo2/actions/runs/900/jobs/0` | false | `/user2/repo2/actions/runs/900/jobs/0` | true | | 2400 | 1500 | 2600 | 0 | `/user2/repo2/actions/runs/1500/jobs/0` | false | `/user2/repo2/actions/runs/1500/jobs/0` | true | | 2400 | 1500 | 2601 | 1 | `/user2/repo2/actions/runs/1500/jobs/1` | false | `/user2/repo2/actions/runs/1500/jobs/1` | true | For users who already ran the old migration `326`, this change has no functional impact. Their historical URLs are already stored in the ID-based form, and ID-based URLs continue to resolve correctly. For users who have not run the old migration `326`, only a subset of legacy target URLs will now be rewritten during upgrade. This avoids the extreme runtime cost of the previous full migration, while all remaining legacy target URLs continue to work through the web-layer compatibility logic. Many thanks to @wxiaoguang for the suggestions.
292 lines
9.0 KiB
Go
292 lines
9.0 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/models/db"
|
|
repo_model "code.gitea.io/gitea/models/repo"
|
|
"code.gitea.io/gitea/modules/actions/jobparser"
|
|
"code.gitea.io/gitea/modules/timeutil"
|
|
"code.gitea.io/gitea/modules/util"
|
|
|
|
"xorm.io/builder"
|
|
)
|
|
|
|
// MaxJobNumPerRun is the maximum number of jobs in a single run.
|
|
// https://docs.github.com/en/actions/reference/limits#existing-system-limits
|
|
// TODO: check this limit when creating jobs
|
|
const MaxJobNumPerRun = 256
|
|
|
|
// ActionRunJob represents a job of a run
|
|
type ActionRunJob struct {
|
|
ID int64
|
|
RunID int64 `xorm:"index"`
|
|
Run *ActionRun `xorm:"-"`
|
|
RepoID int64 `xorm:"index(repo_concurrency)"`
|
|
Repo *repo_model.Repository `xorm:"-"`
|
|
OwnerID int64 `xorm:"index"`
|
|
CommitSHA string `xorm:"index"`
|
|
IsForkPullRequest bool
|
|
Name string `xorm:"VARCHAR(255)"`
|
|
Attempt int64
|
|
|
|
// WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse
|
|
// it should contain exactly one job with global workflow fields for this model
|
|
WorkflowPayload []byte
|
|
|
|
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
|
|
Needs []string `xorm:"JSON TEXT"`
|
|
RunsOn []string `xorm:"JSON TEXT"`
|
|
TaskID int64 // the latest task of the job
|
|
Status Status `xorm:"index"`
|
|
|
|
RawConcurrency string // raw concurrency from job YAML's "concurrency" section
|
|
|
|
// IsConcurrencyEvaluated is only valid/needed when this job's RawConcurrency is not empty.
|
|
// If RawConcurrency can't be evaluated (e.g. depend on other job's outputs or have errors), this field will be false.
|
|
// If RawConcurrency has been successfully evaluated, this field will be true, ConcurrencyGroup and ConcurrencyCancel are also set.
|
|
IsConcurrencyEvaluated bool
|
|
|
|
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
|
|
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
|
|
|
|
// TokenPermissions stores the explicit permissions from workflow/job YAML (no org/repo clamps applied).
|
|
// Org/repo clamps are enforced when the token is used at runtime.
|
|
// It is JSON-encoded repo_model.ActionsTokenPermissions and may be empty if not specified.
|
|
TokenPermissions *repo_model.ActionsTokenPermissions `xorm:"JSON TEXT"`
|
|
|
|
Started timeutil.TimeStamp
|
|
Stopped timeutil.TimeStamp
|
|
Created timeutil.TimeStamp `xorm:"created"`
|
|
Updated timeutil.TimeStamp `xorm:"updated index"`
|
|
}
|
|
|
|
func init() {
|
|
db.RegisterModel(new(ActionRunJob))
|
|
}
|
|
|
|
func (job *ActionRunJob) Duration() time.Duration {
|
|
return calculateDuration(job.Started, job.Stopped, job.Status)
|
|
}
|
|
|
|
func (job *ActionRunJob) LoadRun(ctx context.Context) error {
|
|
if job.Run == nil {
|
|
run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
job.Run = run
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (job *ActionRunJob) LoadRepo(ctx context.Context) error {
|
|
if job.Repo == nil {
|
|
repo, err := repo_model.GetRepositoryByID(ctx, job.RepoID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
job.Repo = repo
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LoadAttributes load Run if not loaded
|
|
func (job *ActionRunJob) LoadAttributes(ctx context.Context) error {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
|
|
if err := job.LoadRun(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return job.Run.LoadAttributes(ctx)
|
|
}
|
|
|
|
// ParseJob parses the job structure from the ActionRunJob.WorkflowPayload
|
|
func (job *ActionRunJob) ParseJob() (*jobparser.Job, error) {
|
|
// job.WorkflowPayload is a SingleWorkflow created from an ActionRun's workflow, which exactly contains this job's YAML definition.
|
|
// Ideally it shouldn't be called "Workflow", it is just a job with global workflow fields + trigger
|
|
parsedWorkflows, err := jobparser.Parse(job.WorkflowPayload)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("job %d single workflow: unable to parse: %w", job.ID, err)
|
|
} else if len(parsedWorkflows) != 1 {
|
|
return nil, fmt.Errorf("job %d single workflow: not single workflow", job.ID)
|
|
}
|
|
_, workflowJob := parsedWorkflows[0].Job()
|
|
if workflowJob == nil {
|
|
// it shouldn't happen, and since the callers don't check nil, so return an error instead of nil
|
|
return nil, util.ErrorWrap(util.ErrNotExist, "job %d single workflow: payload doesn't contain a job", job.ID)
|
|
}
|
|
return workflowJob, nil
|
|
}
|
|
|
|
func GetRunJobByRepoAndID(ctx context.Context, repoID, jobID int64) (*ActionRunJob, error) {
|
|
var job ActionRunJob
|
|
has, err := db.GetEngine(ctx).Where("id=? AND repo_id=?", jobID, repoID).Get(&job)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !has {
|
|
return nil, fmt.Errorf("run job with id %d: %w", jobID, util.ErrNotExist)
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
func GetRunJobByRunAndID(ctx context.Context, runID, jobID int64) (*ActionRunJob, error) {
|
|
var job ActionRunJob
|
|
has, err := db.GetEngine(ctx).Where("id=? AND run_id=?", jobID, runID).Get(&job)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !has {
|
|
return nil, fmt.Errorf("run job with id %d: %w", jobID, util.ErrNotExist)
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
func GetRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error) {
|
|
var jobs []*ActionRunJob
|
|
if err := db.GetEngine(ctx).Where("run_id=?", runID).OrderBy("id").Find(&jobs); err != nil {
|
|
return nil, err
|
|
}
|
|
return jobs, nil
|
|
}
|
|
|
|
func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) {
|
|
e := db.GetEngine(ctx)
|
|
|
|
sess := e.ID(job.ID)
|
|
if len(cols) > 0 {
|
|
sess.Cols(cols...)
|
|
}
|
|
|
|
if cond != nil {
|
|
sess.Where(cond)
|
|
}
|
|
|
|
affected, err := sess.Update(job)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) {
|
|
return affected, nil
|
|
}
|
|
|
|
if slices.Contains(cols, "status") && job.Status.IsWaiting() {
|
|
// if the status of job changes to waiting again, increase tasks version.
|
|
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
if job.RunID == 0 {
|
|
var err error
|
|
if job, err = GetRunJobByRepoAndID(ctx, job.RepoID, job.ID); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
{
|
|
// Other goroutines may aggregate the status of the run and update it too.
|
|
// So we need load the run and its jobs before updating the run.
|
|
run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
run.Status = AggregateJobStatus(jobs)
|
|
if run.Started.IsZero() && run.Status.IsRunning() {
|
|
run.Started = timeutil.TimeStampNow()
|
|
}
|
|
if run.Stopped.IsZero() && run.Status.IsDone() {
|
|
run.Stopped = timeutil.TimeStampNow()
|
|
}
|
|
if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil {
|
|
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
|
|
}
|
|
}
|
|
|
|
return affected, nil
|
|
}
|
|
|
|
func AggregateJobStatus(jobs []*ActionRunJob) Status {
|
|
allSuccessOrSkipped := len(jobs) != 0
|
|
allSkipped := len(jobs) != 0
|
|
var hasFailure, hasCancelled, hasWaiting, hasRunning, hasBlocked bool
|
|
for _, job := range jobs {
|
|
allSuccessOrSkipped = allSuccessOrSkipped && (job.Status == StatusSuccess || job.Status == StatusSkipped)
|
|
allSkipped = allSkipped && job.Status == StatusSkipped
|
|
hasFailure = hasFailure || job.Status == StatusFailure
|
|
hasCancelled = hasCancelled || job.Status == StatusCancelled
|
|
hasWaiting = hasWaiting || job.Status == StatusWaiting
|
|
hasRunning = hasRunning || job.Status == StatusRunning
|
|
hasBlocked = hasBlocked || job.Status == StatusBlocked
|
|
}
|
|
switch {
|
|
case allSkipped:
|
|
return StatusSkipped
|
|
case allSuccessOrSkipped:
|
|
return StatusSuccess
|
|
case hasCancelled:
|
|
return StatusCancelled
|
|
case hasRunning:
|
|
return StatusRunning
|
|
case hasWaiting:
|
|
return StatusWaiting
|
|
case hasFailure:
|
|
return StatusFailure
|
|
case hasBlocked:
|
|
return StatusBlocked
|
|
default:
|
|
return StatusUnknown // it shouldn't happen
|
|
}
|
|
}
|
|
|
|
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) (jobsToCancel []*ActionRunJob, _ error) {
|
|
if job.RawConcurrency == "" {
|
|
return nil, nil
|
|
}
|
|
if !job.IsConcurrencyEvaluated {
|
|
return nil, nil
|
|
}
|
|
if job.ConcurrencyGroup == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
statusFindOption := []Status{StatusWaiting, StatusBlocked}
|
|
if job.ConcurrencyCancel {
|
|
statusFindOption = append(statusFindOption, StatusRunning)
|
|
}
|
|
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
|
|
}
|
|
jobs = slices.DeleteFunc(jobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
|
|
jobsToCancel = append(jobsToCancel, jobs...)
|
|
|
|
// cancel runs in the same concurrency group
|
|
for _, run := range runs {
|
|
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
|
|
RunID: run.ID,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
|
|
}
|
|
jobsToCancel = append(jobsToCancel, jobs...)
|
|
}
|
|
|
|
return CancelJobs(ctx, jobsToCancel)
|
|
}
|