diff --git a/modules/git/catfile_batch_reader.go b/modules/git/catfile_batch_reader.go index 0c8fc740be..5727c4a8ac 100644 --- a/modules/git/catfile_batch_reader.go +++ b/modules/git/catfile_batch_reader.go @@ -13,63 +13,45 @@ import ( "strconv" "strings" "sync/atomic" - "time" "code.gitea.io/gitea/modules/git/gitcmd" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" ) -var catFileBatchDebugWaitClose atomic.Int64 - type catFileBatchCommunicator struct { - closeFunc func(err error) + closeFunc atomic.Pointer[func(err error)] reqWriter io.Writer respReader *bufio.Reader debugGitCmd *gitcmd.Command } -func (b *catFileBatchCommunicator) Close() { - if b.closeFunc != nil { - b.closeFunc(nil) - b.closeFunc = nil +func (b *catFileBatchCommunicator) Close(err ...error) { + if fn := b.closeFunc.Swap(nil); fn != nil { + (*fn)(util.OptionalArg(err)) } } -// newCatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function -func newCatFileBatch(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command) (ret *catFileBatchCommunicator) { +// newCatFileBatch opens git cat-file --batch/--batch-check/--batch-command command and prepares the stdin/stdout pipes for communication. +func newCatFileBatch(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command) *catFileBatchCommunicator { ctx, ctxCancel := context.WithCancelCause(ctx) - - // We often want to feed the commits in order into cat-file --batch, followed by their trees and subtrees as necessary. stdinWriter, stdoutReader, stdPipeClose := cmdCatFile.MakeStdinStdoutPipe() - pipeClose := func() { - if delay := catFileBatchDebugWaitClose.Load(); delay > 0 { - time.Sleep(time.Duration(delay)) // for testing purpose only - } - stdPipeClose() - } - closeFunc := func(err error) { - ctxCancel(err) - pipeClose() - } - return newCatFileBatchWithCloseFunc(ctx, repoPath, cmdCatFile, stdinWriter, stdoutReader, closeFunc) -} - -func newCatFileBatchWithCloseFunc(ctx context.Context, repoPath string, cmdCatFile *gitcmd.Command, - stdinWriter gitcmd.PipeWriter, stdoutReader gitcmd.PipeReader, closeFunc func(err error), -) *catFileBatchCommunicator { ret := &catFileBatchCommunicator{ debugGitCmd: cmdCatFile, - closeFunc: closeFunc, reqWriter: stdinWriter, respReader: bufio.NewReaderSize(stdoutReader, 32*1024), // use a buffered reader for rich operations } + ret.closeFunc.Store(new(func(err error) { + ctxCancel(err) + stdPipeClose() + })) err := cmdCatFile.WithDir(repoPath).StartWithStderr(ctx) if err != nil { log.Error("Unable to start git command %v: %v", cmdCatFile.LogString(), err) // ideally here it should return the error, but it would require refactoring all callers // so just return a dummy communicator that does nothing, almost the same behavior as before, not bad - closeFunc(err) + ret.Close(err) return ret } @@ -78,12 +60,33 @@ func newCatFileBatchWithCloseFunc(ctx context.Context, repoPath string, cmdCatFi if err != nil && !errors.Is(err, context.Canceled) { log.Error("cat-file --batch command failed in repo %s, error: %v", repoPath, err) } - closeFunc(err) + ret.Close(err) }() return ret } +func (b *catFileBatchCommunicator) debugKill() (ret struct { + beforeClose chan struct{} + blockClose chan struct{} + afterClose chan struct{} +}, +) { + ret.beforeClose = make(chan struct{}) + ret.blockClose = make(chan struct{}) + ret.afterClose = make(chan struct{}) + oldCloseFunc := b.closeFunc.Load() + b.closeFunc.Store(new(func(err error) { + b.closeFunc.Store(nil) + close(ret.beforeClose) + <-ret.blockClose + (*oldCloseFunc)(err) + close(ret.afterClose) + })) + b.debugGitCmd.DebugKill() + return ret +} + // catFileBatchParseInfoLine reads the header line from cat-file --batch // We expect: SP SP LF // then leaving the rest of the stream " LF" to be read diff --git a/modules/git/catfile_batch_test.go b/modules/git/catfile_batch_test.go index 69662ffc1a..782d34d249 100644 --- a/modules/git/catfile_batch_test.go +++ b/modules/git/catfile_batch_test.go @@ -7,9 +7,7 @@ import ( "io" "os" "path/filepath" - "sync" "testing" - "time" "code.gitea.io/gitea/modules/test" @@ -39,13 +37,22 @@ func testCatFileBatch(t *testing.T) { require.Error(t, err) }) - simulateQueryTerminated := func(pipeCloseDelay, pipeReadDelay time.Duration) (errRead error) { - catFileBatchDebugWaitClose.Store(int64(pipeCloseDelay)) - defer catFileBatchDebugWaitClose.Store(0) + simulateQueryTerminated := func(t *testing.T, errBeforePipeClose, errAfterPipeClose error) { + readError := func(t *testing.T, r io.Reader, expectedErr error) { + if expectedErr == nil { + return // expectedErr == nil means this read should be skipped + } + n, err := r.Read(make([]byte, 100)) + assert.Zero(t, n) + assert.ErrorIs(t, err, expectedErr) + } + batch, err := NewBatch(t.Context(), filepath.Join(testReposDir, "repo1_bare")) require.NoError(t, err) defer batch.Close() - _, _ = batch.QueryInfo("e2129701f1a4d54dc44f03c93bca0a2aec7c5449") + _, err = batch.QueryInfo("e2129701f1a4d54dc44f03c93bca0a2aec7c5449") + require.NoError(t, err) + var c *catFileBatchCommunicator switch b := batch.(type) { case *catFileBatchLegacy: @@ -58,24 +65,18 @@ func testCatFileBatch(t *testing.T) { t.FailNow() } - wg := sync.WaitGroup{} - wg.Go(func() { - time.Sleep(pipeReadDelay) - var n int - n, errRead = c.respReader.Read(make([]byte, 100)) - assert.Zero(t, n) - }) - time.Sleep(10 * time.Millisecond) - c.debugGitCmd.DebugKill() - wg.Wait() - return errRead - } + require.NotEqual(t, errBeforePipeClose == nil, errAfterPipeClose == nil, "must set exactly one of the expected errors") + inceptor := c.debugKill() + <-inceptor.beforeClose // wait for the command's Close to be called, the pipe is not closed yet + readError(t, c.respReader, errBeforePipeClose) // then caller will read on an open pipe which will be closed soon + close(inceptor.blockClose) // continue to close the pipe + <-inceptor.afterClose // wait for the pipe to be closed + readError(t, c.respReader, errAfterPipeClose) // then caller will read on a closed pipe + } t.Run("QueryTerminated", func(t *testing.T) { - err := simulateQueryTerminated(0, 20*time.Millisecond) - assert.ErrorIs(t, err, os.ErrClosed) // pipes are closed faster - err = simulateQueryTerminated(40*time.Millisecond, 20*time.Millisecond) - assert.ErrorIs(t, err, io.EOF) // reader is faster + simulateQueryTerminated(t, io.EOF, nil) // reader is faster + simulateQueryTerminated(t, nil, os.ErrClosed) // pipes are closed faster }) batch, err := NewBatch(t.Context(), filepath.Join(testReposDir, "repo1_bare"))