mirror of
https://github.com/go-gitea/gitea.git
synced 2026-04-11 19:03:10 +09:00
Fix flaky TestCatFileBatch/QueryTerminated test (#37159)
`TestCatFileBatch/QueryTerminated` relied on timing to distinguish `os.ErrClosed` vs `io.EOF` error paths. Replace `time.Sleep`-based synchronization with a channel-based hook on pipe close, making both error paths fully deterministic regardless of CI runner speed. Ref: https://github.com/go-gitea/gitea/actions/runs/24193070536/job/70615366804 Co-authored-by: Claude (Opus 4.6) <noreply@anthropic.com> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
@@ -13,63 +13,45 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/git/gitcmd"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
)
|
||||
|
||||
var catFileBatchDebugWaitClose atomic.Int64
|
||||
|
||||
type catFileBatchCommunicator struct {
|
||||
closeFunc func(err error)
|
||||
closeFunc atomic.Pointer[func(err error)]
|
||||
reqWriter io.Writer
|
||||
respReader *bufio.Reader
|
||||
debugGitCmd *gitcmd.Command
|
||||
}
|
||||
|
||||
func (b *catFileBatchCommunicator) Close() {
|
||||
if b.closeFunc != nil {
|
||||
b.closeFunc(nil)
|
||||
b.closeFunc = nil
|
||||
func (b *catFileBatchCommunicator) Close(err ...error) {
|
||||
if fn := b.closeFunc.Swap(nil); fn != nil {
|
||||
(*fn)(util.OptionalArg(err))
|
||||
}
|
||||
}
|
||||
|
||||
// newCatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function
|
||||
func newCatFileBatch(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command) (ret *catFileBatchCommunicator) {
|
||||
// newCatFileBatch opens git cat-file --batch/--batch-check/--batch-command command and prepares the stdin/stdout pipes for communication.
|
||||
func newCatFileBatch(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command) *catFileBatchCommunicator {
|
||||
ctx, ctxCancel := context.WithCancelCause(ctx)
|
||||
|
||||
// We often want to feed the commits in order into cat-file --batch, followed by their trees and subtrees as necessary.
|
||||
stdinWriter, stdoutReader, stdPipeClose := cmdCatFile.MakeStdinStdoutPipe()
|
||||
pipeClose := func() {
|
||||
if delay := catFileBatchDebugWaitClose.Load(); delay > 0 {
|
||||
time.Sleep(time.Duration(delay)) // for testing purpose only
|
||||
}
|
||||
stdPipeClose()
|
||||
}
|
||||
closeFunc := func(err error) {
|
||||
ctxCancel(err)
|
||||
pipeClose()
|
||||
}
|
||||
return newCatFileBatchWithCloseFunc(ctx, repoPath, cmdCatFile, stdinWriter, stdoutReader, closeFunc)
|
||||
}
|
||||
|
||||
func newCatFileBatchWithCloseFunc(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command,
|
||||
stdinWriter gitcmd.PipeWriter, stdoutReader gitcmd.PipeReader, closeFunc func(err error),
|
||||
) *catFileBatchCommunicator {
|
||||
ret := &catFileBatchCommunicator{
|
||||
debugGitCmd: cmdCatFile,
|
||||
closeFunc: closeFunc,
|
||||
reqWriter: stdinWriter,
|
||||
respReader: bufio.NewReaderSize(stdoutReader, 32*1024), // use a buffered reader for rich operations
|
||||
}
|
||||
ret.closeFunc.Store(new(func(err error) {
|
||||
ctxCancel(err)
|
||||
stdPipeClose()
|
||||
}))
|
||||
|
||||
err := cmdCatFile.WithDir(repoPath).StartWithStderr(ctx)
|
||||
if err != nil {
|
||||
log.Error("Unable to start git command %v: %v", cmdCatFile.LogString(), err)
|
||||
// ideally here it should return the error, but it would require refactoring all callers
|
||||
// so just return a dummy communicator that does nothing, almost the same behavior as before, not bad
|
||||
closeFunc(err)
|
||||
ret.Close(err)
|
||||
return ret
|
||||
}
|
||||
|
||||
@@ -78,12 +60,33 @@ func newCatFileBatchWithCloseFunc(ctx context.Context, repoPath string, cmdCatFi
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Error("cat-file --batch command failed in repo %s, error: %v", repoPath, err)
|
||||
}
|
||||
closeFunc(err)
|
||||
ret.Close(err)
|
||||
}()
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (b *catFileBatchCommunicator) debugKill() (ret struct {
|
||||
beforeClose chan struct{}
|
||||
blockClose chan struct{}
|
||||
afterClose chan struct{}
|
||||
},
|
||||
) {
|
||||
ret.beforeClose = make(chan struct{})
|
||||
ret.blockClose = make(chan struct{})
|
||||
ret.afterClose = make(chan struct{})
|
||||
oldCloseFunc := b.closeFunc.Load()
|
||||
b.closeFunc.Store(new(func(err error) {
|
||||
b.closeFunc.Store(nil)
|
||||
close(ret.beforeClose)
|
||||
<-ret.blockClose
|
||||
(*oldCloseFunc)(err)
|
||||
close(ret.afterClose)
|
||||
}))
|
||||
b.debugGitCmd.DebugKill()
|
||||
return ret
|
||||
}
|
||||
|
||||
// catFileBatchParseInfoLine reads the header line from cat-file --batch
|
||||
// We expect: <oid> SP <type> SP <size> LF
|
||||
// then leaving the rest of the stream "<contents> LF" to be read
|
||||
|
||||
@@ -7,9 +7,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/test"
|
||||
|
||||
@@ -39,13 +37,22 @@ func testCatFileBatch(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
simulateQueryTerminated := func(pipeCloseDelay, pipeReadDelay time.Duration) (errRead error) {
|
||||
catFileBatchDebugWaitClose.Store(int64(pipeCloseDelay))
|
||||
defer catFileBatchDebugWaitClose.Store(0)
|
||||
simulateQueryTerminated := func(t *testing.T, errBeforePipeClose, errAfterPipeClose error) {
|
||||
readError := func(t *testing.T, r io.Reader, expectedErr error) {
|
||||
if expectedErr == nil {
|
||||
return // expectedErr == nil means this read should be skipped
|
||||
}
|
||||
n, err := r.Read(make([]byte, 100))
|
||||
assert.Zero(t, n)
|
||||
assert.ErrorIs(t, err, expectedErr)
|
||||
}
|
||||
|
||||
batch, err := NewBatch(t.Context(), filepath.Join(testReposDir, "repo1_bare"))
|
||||
require.NoError(t, err)
|
||||
defer batch.Close()
|
||||
_, _ = batch.QueryInfo("e2129701f1a4d54dc44f03c93bca0a2aec7c5449")
|
||||
_, err = batch.QueryInfo("e2129701f1a4d54dc44f03c93bca0a2aec7c5449")
|
||||
require.NoError(t, err)
|
||||
|
||||
var c *catFileBatchCommunicator
|
||||
switch b := batch.(type) {
|
||||
case *catFileBatchLegacy:
|
||||
@@ -58,24 +65,18 @@ func testCatFileBatch(t *testing.T) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Go(func() {
|
||||
time.Sleep(pipeReadDelay)
|
||||
var n int
|
||||
n, errRead = c.respReader.Read(make([]byte, 100))
|
||||
assert.Zero(t, n)
|
||||
})
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
c.debugGitCmd.DebugKill()
|
||||
wg.Wait()
|
||||
return errRead
|
||||
}
|
||||
require.NotEqual(t, errBeforePipeClose == nil, errAfterPipeClose == nil, "must set exactly one of the expected errors")
|
||||
|
||||
inceptor := c.debugKill()
|
||||
<-inceptor.beforeClose // wait for the command's Close to be called, the pipe is not closed yet
|
||||
readError(t, c.respReader, errBeforePipeClose) // then caller will read on an open pipe which will be closed soon
|
||||
close(inceptor.blockClose) // continue to close the pipe
|
||||
<-inceptor.afterClose // wait for the pipe to be closed
|
||||
readError(t, c.respReader, errAfterPipeClose) // then caller will read on a closed pipe
|
||||
}
|
||||
t.Run("QueryTerminated", func(t *testing.T) {
|
||||
err := simulateQueryTerminated(0, 20*time.Millisecond)
|
||||
assert.ErrorIs(t, err, os.ErrClosed) // pipes are closed faster
|
||||
err = simulateQueryTerminated(40*time.Millisecond, 20*time.Millisecond)
|
||||
assert.ErrorIs(t, err, io.EOF) // reader is faster
|
||||
simulateQueryTerminated(t, io.EOF, nil) // reader is faster
|
||||
simulateQueryTerminated(t, nil, os.ErrClosed) // pipes are closed faster
|
||||
})
|
||||
|
||||
batch, err := NewBatch(t.Context(), filepath.Join(testReposDir, "repo1_bare"))
|
||||
|
||||
Reference in New Issue
Block a user