mirror of
https://github.com/go-gitea/gitea.git
synced 2026-05-06 04:01:05 +09:00
This PR introduces a new `ActionRunAttempt` model and makes Actions
execution attempt-scoped.
**Main Changes**
- Each workflow run trigger generates a new `ActionRunAttempt`. The
triggered jobs are then associated with this new `ActionRunAttempt`
record.
- Each rerun now creates:
- a new `ActionRunAttempt` record for the workflow run
- a full new set of `ActionRunJob` records for the new
`ActionRunAttempt`
- For jobs that need to be rerun, the new job records are created as
runnable jobs in the new attempt.
- For jobs that do not need to be rerun, new job records are still
created in the new attempt, but they reuse the result of the previous
attempt instead of executing again.
- Introduce `rerunPlan` to manage each rerun and refactored rerun flow
into a two-phase plan-based model:
- `buildRerunPlan`
- `execRerunPlan`
- `RerunFailedWorkflowRun` and `RerunFailed` no longer directly derives
all jobs that need to be rerun; this step is now handled by
`buildRerunPlan`.
- Converted artifacts from run-scoped to attempt-scoped:
- uploads are now associated with `RunAttemptID`
- listing, download, and deletion resolve against the current attempt
- Added attempt-aware web Actions views:
- the default run page shows the latest attempt
(`/actions/runs/{run_id}`)
- previous attempt pages show jobs and artifacts for that attempt
(`/actions/runs/{run_id}/attempts/{attempt_num}`)
- New APIs:
- `/repos/{owner}/{repo}/actions/runs/{run}/attempts/{attempt}`
- `/repos/{owner}/{repo}/actions/runs/{run}/attempts/{attempt}/jobs`
- New configuration `MAX_RERUN_ATTEMPTS`
- https://gitea.com/gitea/docs/pulls/383
**Compatibility**
- Existing legacy runs use `LatestAttemptID = 0` and legacy jobs use
`RunAttemptID = 0`. Therefore, these fields can be used to identify
legacy runs and jobs and provide backward compatibility.
- If a legacy run is rerun, an `ActionRunAttempt` with `attempt=1` will
be created to represent the original execution. Then a new
`ActionRunAttempt` with `attempt=2` will be created for the real rerun.
- Existing artifact records are not backfilled; legacy artifacts
continue to use `RunAttemptID = 0`.
**Improvements**
- It is now easier to inspect and download logs from previous attempts.
-
[`run_attempt`](https://docs.github.com/en/actions/reference/workflows-and-actions/contexts#github-context)
semantics are now aligned with GitHub.
- > A unique number for each attempt of a particular workflow run in a
repository. This number begins at 1 for the workflow run's first
attempt, and increments with each re-run.
- Rerun behavior is now clearer and more explicit.
- Instead of mutating the status of previous jobs in place, each rerun
creates a new attempt with a full new set of job records.
- Artifacts produced by different reruns can now be listed separately.
Signed-off-by: Zettat123 <zettat123@gmail.com>
Co-authored-by: silverwind <me@silverwind.io>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
Co-authored-by: Giteabot <teabot@gitea.io>
396 lines
12 KiB
Go
396 lines
12 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/models/actions"
|
|
"code.gitea.io/gitea/models/db"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/optional"
|
|
"code.gitea.io/gitea/modules/setting"
|
|
"code.gitea.io/gitea/modules/storage"
|
|
)
|
|
|
|
type saveUploadChunkOptions struct {
|
|
start int64
|
|
end *int64
|
|
checkMd5 bool
|
|
}
|
|
|
|
func makeTmpPathNameV3(runID int64) string {
|
|
return fmt.Sprintf("tmp-upload/run-%d", runID)
|
|
}
|
|
|
|
func makeTmpPathNameV4(runID int64) string {
|
|
return fmt.Sprintf("tmp-upload/run-%d-v4", runID)
|
|
}
|
|
|
|
func makeChunkFilenameV3(runID, artifactID, start int64, endPtr *int64) string {
|
|
var end int64
|
|
if endPtr != nil {
|
|
end = *endPtr
|
|
}
|
|
return fmt.Sprintf("%d-%d-%d-%d.chunk", runID, artifactID, start, end)
|
|
}
|
|
|
|
func parseChunkFileItemV3(st storage.ObjectStorage, fpath string) (*chunkFileItem, error) {
|
|
baseName := path.Base(fpath)
|
|
if !strings.HasSuffix(baseName, ".chunk") {
|
|
return nil, errSkipChunkFile
|
|
}
|
|
|
|
var item chunkFileItem
|
|
var unusedRunID int64
|
|
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &unusedRunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
item.Path = fpath
|
|
if item.End == 0 {
|
|
fi, err := st.Stat(item.Path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
item.Size = fi.Size()
|
|
item.End = item.Start + item.Size - 1
|
|
} else {
|
|
item.Size = item.End - item.Start + 1
|
|
}
|
|
return &item, nil
|
|
}
|
|
|
|
func saveUploadChunkV3(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact,
|
|
runID int64, opts saveUploadChunkOptions,
|
|
) (writtenSize int64, retErr error) {
|
|
// build chunk store path
|
|
storagePath := fmt.Sprintf("%s/%s", makeTmpPathNameV3(runID), makeChunkFilenameV3(runID, artifact.ID, opts.start, opts.end))
|
|
|
|
// "end" is optional, so "contentSize=-1" means read until EOF
|
|
contentSize := int64(-1)
|
|
if opts.end != nil {
|
|
contentSize = *opts.end - opts.start + 1
|
|
}
|
|
|
|
var r io.Reader = ctx.Req.Body
|
|
var hasher hash.Hash
|
|
if opts.checkMd5 {
|
|
// use io.TeeReader to avoid reading all body to md5 sum.
|
|
// it writes data to hasher after reading end
|
|
// if hash is not matched, delete the read-end result
|
|
hasher = md5.New()
|
|
r = io.TeeReader(r, hasher)
|
|
}
|
|
// save chunk to storage
|
|
writtenSize, err := st.Save(storagePath, r, contentSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("save chunk to storage error: %v", err)
|
|
}
|
|
|
|
defer func() {
|
|
if retErr != nil {
|
|
if err := st.Delete(storagePath); err != nil {
|
|
log.Error("Error deleting chunk: %s, %v", storagePath, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if contentSize != -1 && writtenSize != contentSize {
|
|
return writtenSize, fmt.Errorf("writtenSize %d does not match contentSize %d", writtenSize, contentSize)
|
|
}
|
|
if opts.checkMd5 {
|
|
// check md5
|
|
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
|
|
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
|
log.Debug("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
|
|
// if md5 not match, delete the chunk
|
|
if reqMd5String != chunkMd5String {
|
|
return writtenSize, errors.New("md5 not match")
|
|
}
|
|
}
|
|
log.Debug("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, size: %d", storagePath, writtenSize, artifact.ID, opts.start, contentSize)
|
|
return writtenSize, nil
|
|
}
|
|
|
|
func saveUploadChunkV3GetTotalSize(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact, runID int64) (totalSize int64, _ error) {
|
|
// parse content-range header, format: bytes 0-1023/146515
|
|
contentRange := ctx.Req.Header.Get("Content-Range")
|
|
var start, end int64
|
|
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &totalSize); err != nil {
|
|
return 0, fmt.Errorf("parse content range error: %v", err)
|
|
}
|
|
_, err := saveUploadChunkV3(st, ctx, artifact, runID, saveUploadChunkOptions{start: start, end: &end, checkMd5: true})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return totalSize, nil
|
|
}
|
|
|
|
// Returns uploaded length
|
|
func appendUploadChunkV3(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact, runID, start int64) (int64, error) {
|
|
opts := saveUploadChunkOptions{start: start}
|
|
if ctx.Req.ContentLength > 0 {
|
|
end := start + ctx.Req.ContentLength - 1
|
|
opts.end = &end
|
|
}
|
|
return saveUploadChunkV3(st, ctx, artifact, runID, opts)
|
|
}
|
|
|
|
type chunkFileItem struct {
|
|
ArtifactID int64
|
|
Path string
|
|
|
|
// these offset/size related fields might be missing when parsing, they will be filled in the listing functions
|
|
Size int64
|
|
Start int64
|
|
End int64 // inclusive: Size=10, Start=0, End=9
|
|
|
|
ChunkName string // v4 only
|
|
}
|
|
|
|
func listV3UnorderedChunksMapByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
|
|
storageDir := makeTmpPathNameV3(runID)
|
|
var chunks []*chunkFileItem
|
|
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
|
item, err := parseChunkFileItemV3(st, fpath)
|
|
if errors.Is(err, errSkipChunkFile) {
|
|
return nil
|
|
} else if err != nil {
|
|
return fmt.Errorf("unable to parse chunk name: %v", fpath)
|
|
}
|
|
chunks = append(chunks, item)
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
// chunks group by artifact id
|
|
chunksMap := make(map[int64][]*chunkFileItem)
|
|
for _, c := range chunks {
|
|
chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
|
|
}
|
|
return chunksMap, nil
|
|
}
|
|
|
|
func listOrderedChunksForArtifact(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
|
|
emptyListAsError := func(chunks []*chunkFileItem) ([]*chunkFileItem, error) {
|
|
if len(chunks) == 0 {
|
|
return nil, fmt.Errorf("no chunk found for artifact id: %d", artifactID)
|
|
}
|
|
return chunks, nil
|
|
}
|
|
|
|
storageDir := makeTmpPathNameV4(runID)
|
|
var chunks []*chunkFileItem
|
|
var chunkMapV4 map[string]*chunkFileItem
|
|
|
|
if blist != nil {
|
|
// make a dummy map for quick lookup of chunk names, the values are nil now and will be filled after iterating storage objects
|
|
chunkMapV4 = map[string]*chunkFileItem{}
|
|
for _, name := range blist.Latest {
|
|
chunkMapV4[name] = nil
|
|
}
|
|
}
|
|
|
|
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
|
item, err := parseChunkFileItemV4(st, artifactID, fpath)
|
|
if errors.Is(err, errSkipChunkFile) {
|
|
return nil
|
|
} else if err != nil {
|
|
return fmt.Errorf("unable to parse chunk name: %v", fpath)
|
|
}
|
|
|
|
// Single chunk upload with block id
|
|
if _, ok := chunkMapV4[item.ChunkName]; ok {
|
|
chunkMapV4[item.ChunkName] = item
|
|
} else if chunkMapV4 == nil {
|
|
if chunks != nil {
|
|
return errors.New("blockmap is required for chunks > 1")
|
|
}
|
|
chunks = []*chunkFileItem{item}
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if blist == nil && chunks == nil {
|
|
chunkUnorderedItemsMapV3, err := listV3UnorderedChunksMapByRunID(st, runID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
chunks = chunkUnorderedItemsMapV3[artifactID]
|
|
sort.Slice(chunks, func(i, j int) bool {
|
|
return chunks[i].Start < chunks[j].Start
|
|
})
|
|
return emptyListAsError(chunks)
|
|
}
|
|
|
|
if len(chunks) == 0 && blist != nil {
|
|
for i, name := range blist.Latest {
|
|
chunk := chunkMapV4[name]
|
|
if chunk == nil {
|
|
return nil, fmt.Errorf("missing chunk (%d/%d): %s", i, len(blist.Latest), name)
|
|
}
|
|
chunks = append(chunks, chunk)
|
|
}
|
|
}
|
|
for i, chunk := range chunks {
|
|
if i == 0 {
|
|
chunk.End += chunk.Size - 1
|
|
} else {
|
|
chunk.Start = chunkMapV4[blist.Latest[i-1]].End + 1
|
|
chunk.End = chunk.Start + chunk.Size - 1
|
|
}
|
|
}
|
|
return emptyListAsError(chunks)
|
|
}
|
|
|
|
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID, runAttemptID int64, artifactName string) error {
|
|
// read all db artifacts by name
|
|
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
|
|
RunID: runID,
|
|
RunAttemptID: optional.Some(runAttemptID),
|
|
ArtifactName: artifactName,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// read all uploading chunks from storage
|
|
unorderedChunksMap, err := listV3UnorderedChunksMapByRunID(st, runID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// range db artifacts to merge chunks
|
|
for _, art := range artifacts {
|
|
chunks, ok := unorderedChunksMap[art.ID]
|
|
if !ok {
|
|
log.Debug("artifact %d chunks not found", art.ID)
|
|
continue
|
|
}
|
|
if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func generateArtifactStoragePath(artifact *actions.ActionArtifact) string {
|
|
// if chunk is gzip, use gz as extension
|
|
// download-artifact action will use content-encoding header to decide if it should decompress the file
|
|
extension := "chunk"
|
|
if artifact.ContentEncodingOrType == actions.ContentEncodingV3Gzip {
|
|
extension = "chunk.gz"
|
|
}
|
|
|
|
return fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
|
|
}
|
|
|
|
func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error {
|
|
sort.Slice(chunks, func(i, j int) bool {
|
|
return chunks[i].Start < chunks[j].Start
|
|
})
|
|
allChunks := make([]*chunkFileItem, 0)
|
|
startAt := int64(-1)
|
|
// check if all chunks are uploaded and in order and clean repeated chunks
|
|
for _, c := range chunks {
|
|
// startAt is -1 means this is the first chunk
|
|
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
|
|
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
|
|
if c.Start == (startAt + 1) {
|
|
allChunks = append(allChunks, c)
|
|
startAt = c.End
|
|
}
|
|
}
|
|
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
|
|
if startAt+1 != artifact.FileCompressedSize {
|
|
log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
|
|
return nil
|
|
}
|
|
// use multiReader
|
|
readers := make([]io.Reader, 0, len(allChunks))
|
|
closeReaders := func() {
|
|
for _, r := range readers {
|
|
_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
|
|
}
|
|
readers = nil
|
|
}
|
|
defer closeReaders()
|
|
for _, c := range allChunks {
|
|
var readCloser io.ReadCloser
|
|
var err error
|
|
if readCloser, err = st.Open(c.Path); err != nil {
|
|
return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
|
|
}
|
|
readers = append(readers, readCloser)
|
|
}
|
|
mergedReader := io.MultiReader(readers...)
|
|
shaPrefix := "sha256:"
|
|
var hashSha256 hash.Hash
|
|
if strings.HasPrefix(checksum, shaPrefix) {
|
|
hashSha256 = sha256.New()
|
|
} else if checksum != "" {
|
|
setting.PanicInDevOrTesting("unsupported checksum format: %s, will skip the checksum verification", checksum)
|
|
}
|
|
if hashSha256 != nil {
|
|
mergedReader = io.TeeReader(mergedReader, hashSha256)
|
|
}
|
|
|
|
// save merged file
|
|
storagePath := generateArtifactStoragePath(artifact)
|
|
written, err := st.Save(storagePath, mergedReader, artifact.FileCompressedSize)
|
|
if err != nil {
|
|
return fmt.Errorf("save merged file error: %v", err)
|
|
}
|
|
if written != artifact.FileCompressedSize {
|
|
return errors.New("merged file size is not equal to chunk length")
|
|
}
|
|
|
|
defer func() {
|
|
closeReaders() // close before delete
|
|
// drop chunks
|
|
for _, c := range chunks {
|
|
if err := st.Delete(c.Path); err != nil {
|
|
log.Warn("Error deleting chunk: %s, %v", c.Path, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if hashSha256 != nil {
|
|
rawChecksum := hashSha256.Sum(nil)
|
|
actualChecksum := hex.EncodeToString(rawChecksum)
|
|
if !strings.HasSuffix(checksum, actualChecksum) {
|
|
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
|
|
}
|
|
}
|
|
|
|
// save storage path to artifact
|
|
log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
|
|
// if artifact is already uploaded, delete the old file
|
|
if artifact.StoragePath != "" {
|
|
if err := st.Delete(artifact.StoragePath); err != nil {
|
|
log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
|
|
}
|
|
}
|
|
|
|
artifact.StoragePath = storagePath
|
|
artifact.Status = actions.ArtifactStatusUploadConfirmed
|
|
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
|
|
return fmt.Errorf("update artifact error: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|