mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-29 10:57:44 +09:00 
			
		
		
		
	Move Bleve and Elastic code indexers to use a common cat-file --batch (#14781)
* Extract out the common cat-file batch calls Signed-off-by: Andrew Thornton <art27@cantab.net> * Move bleve and elastic indexers to use a common cat-file --batch when indexing Signed-off-by: Andrew Thornton <art27@cantab.net> * move catfilebatch to batch_reader and rename to batch_reader.go Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: Lauris BH <lauris@nix.lv>
This commit is contained in:
		| @@ -2,17 +2,48 @@ | |||||||
| // Use of this source code is governed by a MIT-style | // Use of this source code is governed by a MIT-style | ||||||
| // license that can be found in the LICENSE file. | // license that can be found in the LICENSE file. | ||||||
| 
 | 
 | ||||||
| // +build !gogit |  | ||||||
| 
 |  | ||||||
| package git | package git | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"bufio" | 	"bufio" | ||||||
| 	"bytes" | 	"bytes" | ||||||
|  | 	"io" | ||||||
| 	"math" | 	"math" | ||||||
| 	"strconv" | 	"strconv" | ||||||
|  | 	"strings" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | // CatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function | ||||||
|  | func CatFileBatch(repoPath string) (*io.PipeWriter, *bufio.Reader, func()) { | ||||||
|  | 	// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. | ||||||
|  | 	// so let's create a batch stdin and stdout | ||||||
|  | 	batchStdinReader, batchStdinWriter := io.Pipe() | ||||||
|  | 	batchStdoutReader, batchStdoutWriter := io.Pipe() | ||||||
|  | 	cancel := func() { | ||||||
|  | 		_ = batchStdinReader.Close() | ||||||
|  | 		_ = batchStdinWriter.Close() | ||||||
|  | 		_ = batchStdoutReader.Close() | ||||||
|  | 		_ = batchStdoutWriter.Close() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
|  | 		stderr := strings.Builder{} | ||||||
|  | 		err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repoPath, batchStdoutWriter, &stderr, batchStdinReader) | ||||||
|  | 		if err != nil { | ||||||
|  | 			_ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) | ||||||
|  | 			_ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String())) | ||||||
|  | 		} else { | ||||||
|  | 			_ = batchStdoutWriter.Close() | ||||||
|  | 			_ = batchStdinReader.Close() | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	// For simplicities sake we'll us a buffered reader to read from the cat-file --batch | ||||||
|  | 	batchReader := bufio.NewReader(batchStdoutReader) | ||||||
|  | 
 | ||||||
|  | 	return batchStdinWriter, batchReader, cancel | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // ReadBatchLine reads the header line from cat-file --batch | // ReadBatchLine reads the header line from cat-file --batch | ||||||
| // We expect: | // We expect: | ||||||
| // <sha> SP <type> SP <size> LF | // <sha> SP <type> SP <size> LF | ||||||
| @@ -141,29 +141,8 @@ func GetLastCommitForPaths(commit *Commit, treePath string, paths []string) ([]* | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	// We feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. | 	batchStdinWriter, batchReader, cancel := CatFileBatch(commit.repo.Path) | ||||||
| 	// so let's create a batch stdin and stdout | 	defer cancel() | ||||||
| 	batchStdinReader, batchStdinWriter := io.Pipe() |  | ||||||
| 	batchStdoutReader, batchStdoutWriter := io.Pipe() |  | ||||||
| 	defer func() { |  | ||||||
| 		_ = batchStdinReader.Close() |  | ||||||
| 		_ = batchStdinWriter.Close() |  | ||||||
| 		_ = batchStdoutReader.Close() |  | ||||||
| 		_ = batchStdoutWriter.Close() |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	go func() { |  | ||||||
| 		stderr := strings.Builder{} |  | ||||||
| 		err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(commit.repo.Path, batchStdoutWriter, &stderr, batchStdinReader) |  | ||||||
| 		if err != nil { |  | ||||||
| 			_ = revListWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) |  | ||||||
| 		} else { |  | ||||||
| 			_ = revListWriter.Close() |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	// For simplicities sake we'll us a buffered reader |  | ||||||
| 	batchReader := bufio.NewReader(batchStdoutReader) |  | ||||||
|  |  | ||||||
| 	mapsize := 4096 | 	mapsize := 4096 | ||||||
| 	if len(paths) > mapsize { | 	if len(paths) > mapsize { | ||||||
|   | |||||||
| @@ -64,27 +64,8 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { | |||||||
|  |  | ||||||
| 	// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. | 	// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. | ||||||
| 	// so let's create a batch stdin and stdout | 	// so let's create a batch stdin and stdout | ||||||
| 	batchStdinReader, batchStdinWriter := io.Pipe() | 	batchStdinWriter, batchReader, cancel := git.CatFileBatch(repo.Path) | ||||||
| 	batchStdoutReader, batchStdoutWriter := io.Pipe() | 	defer cancel() | ||||||
| 	defer func() { |  | ||||||
| 		_ = batchStdinReader.Close() |  | ||||||
| 		_ = batchStdinWriter.Close() |  | ||||||
| 		_ = batchStdoutReader.Close() |  | ||||||
| 		_ = batchStdoutWriter.Close() |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	go func() { |  | ||||||
| 		stderr := strings.Builder{} |  | ||||||
| 		err := git.NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader) |  | ||||||
| 		if err != nil { |  | ||||||
| 			_ = revListWriter.CloseWithError(git.ConcatenateError(err, (&stderr).String())) |  | ||||||
| 		} else { |  | ||||||
| 			_ = revListWriter.Close() |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	// For simplicities sake we'll us a buffered reader to read from the cat-file --batch |  | ||||||
| 	batchReader := bufio.NewReader(batchStdoutReader) |  | ||||||
|  |  | ||||||
| 	// We'll use a scanner for the revList because it's simpler than a bufio.Reader | 	// We'll use a scanner for the revList because it's simpler than a bufio.Reader | ||||||
| 	scan := bufio.NewScanner(revListReader) | 	scan := bufio.NewScanner(revListReader) | ||||||
|   | |||||||
| @@ -11,7 +11,6 @@ import ( | |||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"io" | 	"io" | ||||||
| 	"math" | 	"math" | ||||||
| 	"strings" |  | ||||||
|  |  | ||||||
| 	"code.gitea.io/gitea/modules/analyze" | 	"code.gitea.io/gitea/modules/analyze" | ||||||
|  |  | ||||||
| @@ -22,30 +21,8 @@ import ( | |||||||
| func (repo *Repository) GetLanguageStats(commitID string) (map[string]int64, error) { | func (repo *Repository) GetLanguageStats(commitID string) (map[string]int64, error) { | ||||||
| 	// We will feed the commit IDs in order into cat-file --batch, followed by blobs as necessary. | 	// We will feed the commit IDs in order into cat-file --batch, followed by blobs as necessary. | ||||||
| 	// so let's create a batch stdin and stdout | 	// so let's create a batch stdin and stdout | ||||||
|  | 	batchStdinWriter, batchReader, cancel := CatFileBatch(repo.Path) | ||||||
| 	batchStdinReader, batchStdinWriter := io.Pipe() | 	defer cancel() | ||||||
| 	batchStdoutReader, batchStdoutWriter := io.Pipe() |  | ||||||
| 	defer func() { |  | ||||||
| 		_ = batchStdinReader.Close() |  | ||||||
| 		_ = batchStdinWriter.Close() |  | ||||||
| 		_ = batchStdoutReader.Close() |  | ||||||
| 		_ = batchStdoutWriter.Close() |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	go func() { |  | ||||||
| 		stderr := strings.Builder{} |  | ||||||
| 		err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader) |  | ||||||
| 		if err != nil { |  | ||||||
| 			_ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) |  | ||||||
| 			_ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String())) |  | ||||||
| 		} else { |  | ||||||
| 			_ = batchStdoutWriter.Close() |  | ||||||
| 			_ = batchStdinReader.Close() |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	// For simplicities sake we'll us a buffered reader |  | ||||||
| 	batchReader := bufio.NewReader(batchStdoutReader) |  | ||||||
|  |  | ||||||
| 	writeID := func(id string) error { | 	writeID := func(id string) error { | ||||||
| 		_, err := batchStdinWriter.Write([]byte(id)) | 		_, err := batchStdinWriter.Write([]byte(id)) | ||||||
|   | |||||||
| @@ -5,7 +5,10 @@ | |||||||
| package code | package code | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"bufio" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"io/ioutil" | ||||||
| 	"os" | 	"os" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| @@ -173,7 +176,7 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) { | |||||||
| 	return indexer, created, err | 	return indexer, created, err | ||||||
| } | } | ||||||
|  |  | ||||||
| func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { | func (b *BleveIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { | ||||||
| 	// Ignore vendored files in code search | 	// Ignore vendored files in code search | ||||||
| 	if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { | 	if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { | ||||||
| 		return nil | 		return nil | ||||||
| @@ -196,8 +199,16 @@ func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *mode | |||||||
| 		return b.addDelete(update.Filename, repo, batch) | 		return b.addDelete(update.Filename, repo, batch) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). | 	if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil { | ||||||
| 		RunInDirBytes(repo.RepoPath()) | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_, _, size, err := git.ReadBatchLine(batchReader) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} else if !base.IsTextFile(fileContents) { | 	} else if !base.IsTextFile(fileContents) { | ||||||
| @@ -254,11 +265,18 @@ func (b *BleveIndexer) Close() { | |||||||
| // Index indexes the data | // Index indexes the data | ||||||
| func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { | func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { | ||||||
| 	batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) | 	batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) | ||||||
|  | 	if len(changes.Updates) > 0 { | ||||||
|  |  | ||||||
|  | 		batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath()) | ||||||
|  | 		defer cancel() | ||||||
|  |  | ||||||
| 		for _, update := range changes.Updates { | 		for _, update := range changes.Updates { | ||||||
| 		if err := b.addUpdate(sha, update, repo, batch); err != nil { | 			if err := b.addUpdate(batchWriter, batchReader, sha, update, repo, batch); err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		cancel() | ||||||
|  | 	} | ||||||
| 	for _, filename := range changes.RemovedFilenames { | 	for _, filename := range changes.RemovedFilenames { | ||||||
| 		if err := b.addDelete(filename, repo, batch); err != nil { | 		if err := b.addDelete(filename, repo, batch); err != nil { | ||||||
| 			return err | 			return err | ||||||
|   | |||||||
| @@ -5,8 +5,11 @@ | |||||||
| package code | package code | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"bufio" | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"io/ioutil" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| @@ -172,7 +175,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) { | |||||||
| 	return exists, nil | 	return exists, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { | func (b *ElasticSearchIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { | ||||||
| 	// Ignore vendored files in code search | 	// Ignore vendored files in code search | ||||||
| 	if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { | 	if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { | ||||||
| 		return nil, nil | 		return nil, nil | ||||||
| @@ -195,8 +198,16 @@ func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *mo | |||||||
| 		return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil | 		return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). | 	if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil { | ||||||
| 		RunInDirBytes(repo.RepoPath()) | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_, _, size, err := git.ReadBatchLine(batchReader) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} else if !base.IsTextFile(fileContents) { | 	} else if !base.IsTextFile(fileContents) { | ||||||
| @@ -230,8 +241,13 @@ func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repositor | |||||||
| // Index will save the index data | // Index will save the index data | ||||||
| func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { | func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error { | ||||||
| 	reqs := make([]elastic.BulkableRequest, 0) | 	reqs := make([]elastic.BulkableRequest, 0) | ||||||
|  | 	if len(changes.Updates) > 0 { | ||||||
|  |  | ||||||
|  | 		batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath()) | ||||||
|  | 		defer cancel() | ||||||
|  |  | ||||||
| 		for _, update := range changes.Updates { | 		for _, update := range changes.Updates { | ||||||
| 		updateReqs, err := b.addUpdate(sha, update, repo) | 			updateReqs, err := b.addUpdate(batchWriter, batchReader, sha, update, repo) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| @@ -239,6 +255,8 @@ func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, change | |||||||
| 				reqs = append(reqs, updateReqs...) | 				reqs = append(reqs, updateReqs...) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		cancel() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	for _, filename := range changes.RemovedFilenames { | 	for _, filename := range changes.RemovedFilenames { | ||||||
| 		reqs = append(reqs, b.addDelete(filename, repo)) | 		reqs = append(reqs, b.addDelete(filename, repo)) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user