mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-27 00:23:41 +09:00 
			
		
		
		
	Refactor indexer (#25174)
Refactor `modules/indexer` to make it more maintainable. And it can be
easier to support more features. I'm trying to solve some of issue
searching, this is a precursor to making functional changes.
Current supported engines and the index versions:
| engines | issues | code |
| - | - | - |
| db | Just a wrapper for database queries, doesn't need version | - |
| bleve | The version of index is **2** | The version of index is **6**
|
| elasticsearch | The old index has no version, will be treated as
version **0** in this PR | The version of index is **1** |
| meilisearch | The old index has no version, will be treated as version
**0** in this PR | - |
## Changes
### Split
Splited it into mutiple packages
```text
indexer
├── internal
│   ├── bleve
│   ├── db
│   ├── elasticsearch
│   └── meilisearch
├── code
│   ├── bleve
│   ├── elasticsearch
│   └── internal
└── issues
    ├── bleve
    ├── db
    ├── elasticsearch
    ├── internal
    └── meilisearch
```
- `indexer/interanal`: Internal shared package for indexer.
- `indexer/interanal/[engine]`: Internal shared package for each engine
(bleve/db/elasticsearch/meilisearch).
- `indexer/code`: Implementations for code indexer.
- `indexer/code/internal`: Internal shared package for code indexer.
- `indexer/code/[engine]`: Implementation via each engine for code
indexer.
- `indexer/issues`: Implementations for issues indexer.
### Deduplication
- Combine `Init/Ping/Close` for code indexer and issues indexer.
- ~Combine `issues.indexerHolder` and `code.wrappedIndexer` to
`internal.IndexHolder`.~ Remove it, use dummy indexer instead when the
indexer is not ready.
- Duplicate two copies of creating ES clients.
- Duplicate two copies of `indexerID()`.
### Enhancement
- [x] Support index version for elasticsearch issues indexer, the old
index without version will be treated as version 0.
- [x] Fix spell of `elastic_search/ElasticSearch`, it should be
`Elasticsearch`.
- [x] Improve versioning of ES index. We don't need `Aliases`:
- Gitea does't need aliases for "Zero Downtime" because it never delete
old indexes.
- The old code of issues indexer uses the orignal name to create issue
index, so it's tricky to convert it to an alias.
- [x] Support index version for meilisearch issues indexer, the old
index without version will be treated as version 0.
- [x] Do "ping" only when `Ping` has been called, don't ping
periodically and cache the status.
- [x] Support the context parameter whenever possible.
- [x] Fix outdated example config.
- [x] Give up the requeue logic of issues indexer: When indexing fails,
call Ping to check if it was caused by the engine being unavailable, and
only requeue the task if the engine is unavailable.
- It is fragile and tricky, could cause data losing (It did happen when
I was doing some tests for this PR). And it works for ES only.
- Just always requeue the failed task, if it caused by bad data, it's a
bug of Gitea which should be fixed.
---------
Co-authored-by: Giteabot <teabot@gitea.io>
			
			
This commit is contained in:
		| @@ -7,86 +7,41 @@ import ( | ||||
| 	"context" | ||||
| 	"os" | ||||
| 	"runtime/pprof" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"code.gitea.io/gitea/models/db" | ||||
| 	repo_model "code.gitea.io/gitea/models/repo" | ||||
| 	"code.gitea.io/gitea/modules/graceful" | ||||
| 	"code.gitea.io/gitea/modules/indexer/code/bleve" | ||||
| 	"code.gitea.io/gitea/modules/indexer/code/elasticsearch" | ||||
| 	"code.gitea.io/gitea/modules/indexer/code/internal" | ||||
| 	"code.gitea.io/gitea/modules/log" | ||||
| 	"code.gitea.io/gitea/modules/process" | ||||
| 	"code.gitea.io/gitea/modules/queue" | ||||
| 	"code.gitea.io/gitea/modules/setting" | ||||
| 	"code.gitea.io/gitea/modules/timeutil" | ||||
| 	"code.gitea.io/gitea/modules/util" | ||||
| ) | ||||
|  | ||||
| // SearchResult result of performing a search in a repo | ||||
| type SearchResult struct { | ||||
| 	RepoID      int64 | ||||
| 	StartIndex  int | ||||
| 	EndIndex    int | ||||
| 	Filename    string | ||||
| 	Content     string | ||||
| 	CommitID    string | ||||
| 	UpdatedUnix timeutil.TimeStamp | ||||
| 	Language    string | ||||
| 	Color       string | ||||
| var ( | ||||
| 	indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] | ||||
| 	// globalIndexer is the global indexer, it cannot be nil. | ||||
| 	// When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. | ||||
| 	// So it's always safe use it as *globalIndexer.Load() and call its methods. | ||||
| 	globalIndexer atomic.Pointer[internal.Indexer] | ||||
| 	dummyIndexer  *internal.Indexer | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	i := internal.NewDummyIndexer() | ||||
| 	dummyIndexer = &i | ||||
| 	globalIndexer.Store(dummyIndexer) | ||||
| } | ||||
|  | ||||
| // SearchResultLanguages result of top languages count in search results | ||||
| type SearchResultLanguages struct { | ||||
| 	Language string | ||||
| 	Color    string | ||||
| 	Count    int | ||||
| } | ||||
|  | ||||
| // Indexer defines an interface to index and search code contents | ||||
| type Indexer interface { | ||||
| 	Ping() bool | ||||
| 	Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error | ||||
| 	Delete(repoID int64) error | ||||
| 	Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) | ||||
| 	Close() | ||||
| } | ||||
|  | ||||
| func filenameIndexerID(repoID int64, filename string) string { | ||||
| 	return indexerID(repoID) + "_" + filename | ||||
| } | ||||
|  | ||||
| func indexerID(id int64) string { | ||||
| 	return strconv.FormatInt(id, 36) | ||||
| } | ||||
|  | ||||
| func parseIndexerID(indexerID string) (int64, string) { | ||||
| 	index := strings.IndexByte(indexerID, '_') | ||||
| 	if index == -1 { | ||||
| 		log.Error("Unexpected ID in repo indexer: %s", indexerID) | ||||
| 	} | ||||
| 	repoID, _ := strconv.ParseInt(indexerID[:index], 36, 64) | ||||
| 	return repoID, indexerID[index+1:] | ||||
| } | ||||
|  | ||||
| func filenameOfIndexerID(indexerID string) string { | ||||
| 	index := strings.IndexByte(indexerID, '_') | ||||
| 	if index == -1 { | ||||
| 		log.Error("Unexpected ID in repo indexer: %s", indexerID) | ||||
| 	} | ||||
| 	return indexerID[index+1:] | ||||
| } | ||||
|  | ||||
| // IndexerData represents data stored in the code indexer | ||||
| type IndexerData struct { | ||||
| 	RepoID int64 | ||||
| } | ||||
|  | ||||
| var indexerQueue *queue.WorkerPoolQueue[*IndexerData] | ||||
|  | ||||
| func index(ctx context.Context, indexer Indexer, repoID int64) error { | ||||
| func index(ctx context.Context, indexer internal.Indexer, repoID int64) error { | ||||
| 	repo, err := repo_model.GetRepositoryByID(ctx, repoID) | ||||
| 	if repo_model.IsErrRepoNotExist(err) { | ||||
| 		return indexer.Delete(repoID) | ||||
| 		return indexer.Delete(ctx, repoID) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -139,7 +94,7 @@ func index(ctx context.Context, indexer Indexer, repoID int64) error { | ||||
| // Init initialize the repo indexer | ||||
| func Init() { | ||||
| 	if !setting.Indexer.RepoIndexerEnabled { | ||||
| 		indexer.Close() | ||||
| 		(*globalIndexer.Load()).Close() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -153,7 +108,7 @@ func Init() { | ||||
| 		} | ||||
| 		cancel() | ||||
| 		log.Debug("Closing repository indexer") | ||||
| 		indexer.Close() | ||||
| 		(*globalIndexer.Load()).Close() | ||||
| 		log.Info("PID: %d Repository Indexer closed", os.Getpid()) | ||||
| 		finished() | ||||
| 	}) | ||||
| @@ -163,13 +118,8 @@ func Init() { | ||||
| 	// Create the Queue | ||||
| 	switch setting.Indexer.RepoType { | ||||
| 	case "bleve", "elasticsearch": | ||||
| 		handler := func(items ...*IndexerData) (unhandled []*IndexerData) { | ||||
| 			idx, err := indexer.get() | ||||
| 			if idx == nil || err != nil { | ||||
| 				log.Warn("Codes indexer handler: indexer is not ready, retry later.") | ||||
| 				return items | ||||
| 			} | ||||
|  | ||||
| 		handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { | ||||
| 			indexer := *globalIndexer.Load() | ||||
| 			for _, indexerData := range items { | ||||
| 				log.Trace("IndexerData Process Repo: %d", indexerData.RepoID) | ||||
|  | ||||
| @@ -188,11 +138,7 @@ func Init() { | ||||
| 					code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105) | ||||
| 				*/ | ||||
| 				if err := index(ctx, indexer, indexerData.RepoID); err != nil { | ||||
| 					if !idx.Ping() { | ||||
| 						log.Error("Code indexer handler: indexer is unavailable.") | ||||
| 						unhandled = append(unhandled, indexerData) | ||||
| 						continue | ||||
| 					} | ||||
| 					unhandled = append(unhandled, indexerData) | ||||
| 					if !setting.IsInTesting { | ||||
| 						log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err) | ||||
| 					} | ||||
| @@ -213,8 +159,8 @@ func Init() { | ||||
| 		pprof.SetGoroutineLabels(ctx) | ||||
| 		start := time.Now() | ||||
| 		var ( | ||||
| 			rIndexer Indexer | ||||
| 			populate bool | ||||
| 			rIndexer internal.Indexer | ||||
| 			existed  bool | ||||
| 			err      error | ||||
| 		) | ||||
| 		switch setting.Indexer.RepoType { | ||||
| @@ -228,10 +174,11 @@ func Init() { | ||||
| 				} | ||||
| 			}() | ||||
|  | ||||
| 			rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath) | ||||
| 			rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath) | ||||
| 			existed, err = rIndexer.Init(ctx) | ||||
| 			if err != nil { | ||||
| 				cancel() | ||||
| 				indexer.Close() | ||||
| 				(*globalIndexer.Load()).Close() | ||||
| 				close(waitChannel) | ||||
| 				log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) | ||||
| 			} | ||||
| @@ -245,23 +192,31 @@ func Init() { | ||||
| 				} | ||||
| 			}() | ||||
|  | ||||
| 			rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) | ||||
| 			rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) | ||||
| 			if err != nil { | ||||
| 				cancel() | ||||
| 				indexer.Close() | ||||
| 				(*globalIndexer.Load()).Close() | ||||
| 				close(waitChannel) | ||||
| 				log.Fatal("PID: %d Unable to create the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) | ||||
| 			} | ||||
| 			existed, err = rIndexer.Init(ctx) | ||||
| 			if err != nil { | ||||
| 				cancel() | ||||
| 				(*globalIndexer.Load()).Close() | ||||
| 				close(waitChannel) | ||||
| 				log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) | ||||
| 			} | ||||
|  | ||||
| 		default: | ||||
| 			log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) | ||||
| 		} | ||||
|  | ||||
| 		indexer.set(rIndexer) | ||||
| 		globalIndexer.Store(&rIndexer) | ||||
|  | ||||
| 		// Start processing the queue | ||||
| 		go graceful.GetManager().RunWithCancel(indexerQueue) | ||||
|  | ||||
| 		if populate { | ||||
| 		if !existed { // populate the index because it's created for the first time | ||||
| 			go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) | ||||
| 		} | ||||
| 		select { | ||||
| @@ -283,18 +238,18 @@ func Init() { | ||||
| 			case <-graceful.GetManager().IsShutdown(): | ||||
| 				log.Warn("Shutdown before Repository Indexer completed initialization") | ||||
| 				cancel() | ||||
| 				indexer.Close() | ||||
| 				(*globalIndexer.Load()).Close() | ||||
| 			case duration, ok := <-waitChannel: | ||||
| 				if !ok { | ||||
| 					log.Warn("Repository Indexer Initialization failed") | ||||
| 					cancel() | ||||
| 					indexer.Close() | ||||
| 					(*globalIndexer.Load()).Close() | ||||
| 					return | ||||
| 				} | ||||
| 				log.Info("Repository Indexer Initialization took %v", duration) | ||||
| 			case <-time.After(timeout): | ||||
| 				cancel() | ||||
| 				indexer.Close() | ||||
| 				(*globalIndexer.Load()).Close() | ||||
| 				log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) | ||||
| 			} | ||||
| 		}() | ||||
| @@ -303,21 +258,15 @@ func Init() { | ||||
|  | ||||
| // UpdateRepoIndexer update a repository's entries in the indexer | ||||
| func UpdateRepoIndexer(repo *repo_model.Repository) { | ||||
| 	indexData := &IndexerData{RepoID: repo.ID} | ||||
| 	indexData := &internal.IndexerData{RepoID: repo.ID} | ||||
| 	if err := indexerQueue.Push(indexData); err != nil { | ||||
| 		log.Error("Update repo index data %v failed: %v", indexData, err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // IsAvailable checks if issue indexer is available | ||||
| func IsAvailable() bool { | ||||
| 	idx, err := indexer.get() | ||||
| 	if err != nil { | ||||
| 		log.Error("IsAvailable(): unable to get indexer: %v", err) | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	return idx.Ping() | ||||
| func IsAvailable(ctx context.Context) bool { | ||||
| 	return (*globalIndexer.Load()).Ping(ctx) == nil | ||||
| } | ||||
|  | ||||
| // populateRepoIndexer populate the repo indexer with pre-existing data. This | ||||
| @@ -368,7 +317,7 @@ func populateRepoIndexer(ctx context.Context) { | ||||
| 				return | ||||
| 			default: | ||||
| 			} | ||||
| 			if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { | ||||
| 			if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil { | ||||
| 				log.Error("indexerQueue.Push: %v", err) | ||||
| 				return | ||||
| 			} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user