mirror of
https://github.com/go-gitea/gitea.git
synced 2026-02-07 09:49:41 +09:00
fix(packages/container): data race when uploading container blobs concurrently (#36524)
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
@@ -43,13 +43,15 @@ func GetOrInsertBlob(ctx context.Context, pb *PackageBlob) (*PackageBlob, bool,
|
|||||||
|
|
||||||
existing := &PackageBlob{}
|
existing := &PackageBlob{}
|
||||||
|
|
||||||
has, err := e.Where(builder.Eq{
|
hashCond := builder.Eq{
|
||||||
"size": pb.Size,
|
"size": pb.Size,
|
||||||
"hash_md5": pb.HashMD5,
|
"hash_md5": pb.HashMD5,
|
||||||
"hash_sha1": pb.HashSHA1,
|
"hash_sha1": pb.HashSHA1,
|
||||||
"hash_sha256": pb.HashSHA256,
|
"hash_sha256": pb.HashSHA256,
|
||||||
"hash_sha512": pb.HashSHA512,
|
"hash_sha512": pb.HashSHA512,
|
||||||
}).Get(existing)
|
}
|
||||||
|
|
||||||
|
has, err := e.Where(hashCond).Get(existing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
@@ -57,6 +59,11 @@ func GetOrInsertBlob(ctx context.Context, pb *PackageBlob) (*PackageBlob, bool,
|
|||||||
return existing, true, nil
|
return existing, true, nil
|
||||||
}
|
}
|
||||||
if _, err = e.Insert(pb); err != nil {
|
if _, err = e.Insert(pb); err != nil {
|
||||||
|
// Handle race condition: another request may have inserted the same blob
|
||||||
|
// between our SELECT and INSERT. Retry the SELECT to get the existing blob.
|
||||||
|
if has, _ = e.Where(hashCond).Get(existing); has {
|
||||||
|
return existing, true, nil
|
||||||
|
}
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
return pb, false, nil
|
return pb, false, nil
|
||||||
|
|||||||
51
models/packages/package_blob_test.go
Normal file
51
models/packages/package_blob_test.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package packages
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models/unittest"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetOrInsertBlobConcurrent(t *testing.T) {
|
||||||
|
require.NoError(t, unittest.PrepareTestDatabase())
|
||||||
|
|
||||||
|
testBlob := PackageBlob{
|
||||||
|
Size: 123,
|
||||||
|
HashMD5: "md5",
|
||||||
|
HashSHA1: "sha1",
|
||||||
|
HashSHA256: "sha256",
|
||||||
|
HashSHA512: "sha512",
|
||||||
|
}
|
||||||
|
|
||||||
|
const numGoroutines = 3
|
||||||
|
var wg errgroup.Group
|
||||||
|
results := make([]*PackageBlob, numGoroutines)
|
||||||
|
existed := make([]bool, numGoroutines)
|
||||||
|
for idx := range numGoroutines {
|
||||||
|
wg.Go(func() error {
|
||||||
|
blob := testBlob // Create a copy of the test blob for each goroutine
|
||||||
|
var err error
|
||||||
|
results[idx], existed[idx], err = GetOrInsertBlob(t.Context(), &blob)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, wg.Wait())
|
||||||
|
|
||||||
|
// then: all GetOrInsertBlob succeeds with the same blob ID, and only one indicates it did not exist before
|
||||||
|
existedCount := 0
|
||||||
|
assert.NotNil(t, results[0])
|
||||||
|
for i := range numGoroutines {
|
||||||
|
assert.Equal(t, results[0].ID, results[i].ID)
|
||||||
|
if existed[i] {
|
||||||
|
existedCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, numGoroutines-1, existedCount)
|
||||||
|
}
|
||||||
@@ -26,9 +26,18 @@ import (
|
|||||||
|
|
||||||
// saveAsPackageBlob creates a package blob from an upload
|
// saveAsPackageBlob creates a package blob from an upload
|
||||||
// The uploaded blob gets stored in a special upload version to link them to the package/image
|
// The uploaded blob gets stored in a special upload version to link them to the package/image
|
||||||
func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) { //nolint:unparam // PackageBlob is never used
|
// There will be concurrent uploading for the same blob, so it needs a global lock per blob hash
|
||||||
|
func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) { //nolint:unparam //returned PackageBlob is never used
|
||||||
pb := packages_service.NewPackageBlob(hsr)
|
pb := packages_service.NewPackageBlob(hsr)
|
||||||
|
err := globallock.LockAndDo(ctx, "container-blob:"+pb.HashSHA256, func(ctx context.Context) error {
|
||||||
|
var err error
|
||||||
|
pb, err = saveAsPackageBlobInternal(ctx, hsr, pci, pb)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return pb, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveAsPackageBlobInternal(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo, pb *packages_model.PackageBlob) (*packages_model.PackageBlob, error) {
|
||||||
exists := false
|
exists := false
|
||||||
|
|
||||||
contentStore := packages_module.NewContentStore()
|
contentStore := packages_module.NewContentStore()
|
||||||
@@ -67,7 +76,7 @@ func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader
|
|||||||
return createFileForBlob(ctx, uploadVersion, pb)
|
return createFileForBlob(ctx, uploadVersion, pb)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !exists {
|
if !exists && pb != nil { // pb can be nil if GetOrInsertBlob failed
|
||||||
if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil {
|
if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil {
|
||||||
log.Error("Error deleting package blob from content store: %v", err)
|
log.Error("Error deleting package blob from content store: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,10 +63,10 @@ func NewBlobUploader(ctx context.Context, id string) (*BlobUploader, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &BlobUploader{
|
return &BlobUploader{
|
||||||
model,
|
PackageBlobUpload: model,
|
||||||
hash,
|
MultiHasher: hash,
|
||||||
f,
|
file: f,
|
||||||
false,
|
reading: false,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user