mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-03 08:02:36 +09:00 
			
		
		
		
	* Queue: Add generic graceful queues with settings
* Queue & Setting: Add worker pool implementation
* Queue: Add worker settings
* Queue: Make resizing worker pools
* Queue: Add name variable to queues
* Queue: Add monitoring
* Queue: Improve logging
* Issues: Gracefulise the issues indexer
Remove the old now unused specific queues
* Task: Move to generic queue and gracefulise
* Issues: Standardise the issues indexer queue settings
* Fix test
* Queue: Allow Redis to connect to unix
* Prevent deadlock during early shutdown of issue indexer
* Add MaxWorker settings to queues
* Merge branch 'master' into graceful-queues
* Update modules/indexer/issues/indexer.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/indexer/issues/indexer.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_channel.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Update modules/queue/queue_disk.go
* Update modules/queue/queue_disk_channel.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* Rename queue.Description to queue.ManagedQueue as per @guillep2k
* Cancel pool workers when removed
* Remove dependency on queue from setting
* Update modules/queue/queue_redis.go
Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com>
* As per @guillep2k add mutex locks on shutdown/terminate
* move unlocking out of setInternal
* Add warning if number of workers < 0
* Small changes as per @guillep2k
* No redis host specified not found
* Clean up documentation for queues
* Update docs/content/doc/advanced/config-cheat-sheet.en-us.md
* Update modules/indexer/issues/indexer_test.go
* Ensure that persistable channel queue is added to manager
* Rename QUEUE_NAME REDIS_QUEUE_NAME
* Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME"
This reverts commit 1f83b4fc9b.
Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: techknowlogick <matti@mdranta.net>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
		
	
		
			
				
	
	
		
			271 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2019 The Gitea Authors. All rights reserved.
 | 
						|
// Use of this source code is governed by a MIT-style
 | 
						|
// license that can be found in the LICENSE file.
 | 
						|
 | 
						|
package queue
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"reflect"
 | 
						|
	"sort"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"code.gitea.io/gitea/modules/log"
 | 
						|
)
 | 
						|
 | 
						|
var manager *Manager
 | 
						|
 | 
						|
// Manager is a queue manager
 | 
						|
type Manager struct {
 | 
						|
	mutex sync.Mutex
 | 
						|
 | 
						|
	counter int64
 | 
						|
	Queues  map[int64]*ManagedQueue
 | 
						|
}
 | 
						|
 | 
						|
// ManagedQueue represents a working queue inheriting from Gitea.
 | 
						|
type ManagedQueue struct {
 | 
						|
	mutex         sync.Mutex
 | 
						|
	QID           int64
 | 
						|
	Queue         Queue
 | 
						|
	Type          Type
 | 
						|
	Name          string
 | 
						|
	Configuration interface{}
 | 
						|
	ExemplarType  string
 | 
						|
	Pool          ManagedPool
 | 
						|
	counter       int64
 | 
						|
	PoolWorkers   map[int64]*PoolWorkers
 | 
						|
}
 | 
						|
 | 
						|
// ManagedPool is a simple interface to get certain details from a worker pool
 | 
						|
type ManagedPool interface {
 | 
						|
	AddWorkers(number int, timeout time.Duration) context.CancelFunc
 | 
						|
	NumberOfWorkers() int
 | 
						|
	MaxNumberOfWorkers() int
 | 
						|
	SetMaxNumberOfWorkers(int)
 | 
						|
	BoostTimeout() time.Duration
 | 
						|
	BlockTimeout() time.Duration
 | 
						|
	BoostWorkers() int
 | 
						|
	SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
 | 
						|
}
 | 
						|
 | 
						|
// ManagedQueueList implements the sort.Interface
 | 
						|
type ManagedQueueList []*ManagedQueue
 | 
						|
 | 
						|
// PoolWorkers represents a working queue inheriting from Gitea.
 | 
						|
type PoolWorkers struct {
 | 
						|
	PID        int64
 | 
						|
	Workers    int
 | 
						|
	Start      time.Time
 | 
						|
	Timeout    time.Time
 | 
						|
	HasTimeout bool
 | 
						|
	Cancel     context.CancelFunc
 | 
						|
}
 | 
						|
 | 
						|
// PoolWorkersList implements the sort.Interface
 | 
						|
type PoolWorkersList []*PoolWorkers
 | 
						|
 | 
						|
func init() {
 | 
						|
	_ = GetManager()
 | 
						|
}
 | 
						|
 | 
						|
// GetManager returns a Manager and initializes one as singleton if there's none yet
 | 
						|
func GetManager() *Manager {
 | 
						|
	if manager == nil {
 | 
						|
		manager = &Manager{
 | 
						|
			Queues: make(map[int64]*ManagedQueue),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return manager
 | 
						|
}
 | 
						|
 | 
						|
// Add adds a queue to this manager
 | 
						|
func (m *Manager) Add(queue Queue,
 | 
						|
	t Type,
 | 
						|
	configuration,
 | 
						|
	exemplar interface{},
 | 
						|
	pool ManagedPool) int64 {
 | 
						|
 | 
						|
	cfg, _ := json.Marshal(configuration)
 | 
						|
	mq := &ManagedQueue{
 | 
						|
		Queue:         queue,
 | 
						|
		Type:          t,
 | 
						|
		Configuration: string(cfg),
 | 
						|
		ExemplarType:  reflect.TypeOf(exemplar).String(),
 | 
						|
		PoolWorkers:   make(map[int64]*PoolWorkers),
 | 
						|
		Pool:          pool,
 | 
						|
	}
 | 
						|
	m.mutex.Lock()
 | 
						|
	m.counter++
 | 
						|
	mq.QID = m.counter
 | 
						|
	mq.Name = fmt.Sprintf("queue-%d", mq.QID)
 | 
						|
	if named, ok := queue.(Named); ok {
 | 
						|
		mq.Name = named.Name()
 | 
						|
	}
 | 
						|
	m.Queues[mq.QID] = mq
 | 
						|
	m.mutex.Unlock()
 | 
						|
	log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
 | 
						|
	return mq.QID
 | 
						|
}
 | 
						|
 | 
						|
// Remove a queue from the Manager
 | 
						|
func (m *Manager) Remove(qid int64) {
 | 
						|
	m.mutex.Lock()
 | 
						|
	delete(m.Queues, qid)
 | 
						|
	m.mutex.Unlock()
 | 
						|
	log.Trace("Queue Manager removed: QID: %d", qid)
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// GetManagedQueue by qid
 | 
						|
func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
 | 
						|
	m.mutex.Lock()
 | 
						|
	defer m.mutex.Unlock()
 | 
						|
	return m.Queues[qid]
 | 
						|
}
 | 
						|
 | 
						|
// ManagedQueues returns the managed queues
 | 
						|
func (m *Manager) ManagedQueues() []*ManagedQueue {
 | 
						|
	m.mutex.Lock()
 | 
						|
	mqs := make([]*ManagedQueue, 0, len(m.Queues))
 | 
						|
	for _, mq := range m.Queues {
 | 
						|
		mqs = append(mqs, mq)
 | 
						|
	}
 | 
						|
	m.mutex.Unlock()
 | 
						|
	sort.Sort(ManagedQueueList(mqs))
 | 
						|
	return mqs
 | 
						|
}
 | 
						|
 | 
						|
// Workers returns the poolworkers
 | 
						|
func (q *ManagedQueue) Workers() []*PoolWorkers {
 | 
						|
	q.mutex.Lock()
 | 
						|
	workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
 | 
						|
	for _, worker := range q.PoolWorkers {
 | 
						|
		workers = append(workers, worker)
 | 
						|
	}
 | 
						|
	q.mutex.Unlock()
 | 
						|
	sort.Sort(PoolWorkersList(workers))
 | 
						|
	return workers
 | 
						|
}
 | 
						|
 | 
						|
// RegisterWorkers registers workers to this queue
 | 
						|
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
 | 
						|
	q.mutex.Lock()
 | 
						|
	defer q.mutex.Unlock()
 | 
						|
	q.counter++
 | 
						|
	q.PoolWorkers[q.counter] = &PoolWorkers{
 | 
						|
		PID:        q.counter,
 | 
						|
		Workers:    number,
 | 
						|
		Start:      start,
 | 
						|
		Timeout:    timeout,
 | 
						|
		HasTimeout: hasTimeout,
 | 
						|
		Cancel:     cancel,
 | 
						|
	}
 | 
						|
	return q.counter
 | 
						|
}
 | 
						|
 | 
						|
// CancelWorkers cancels pooled workers with pid
 | 
						|
func (q *ManagedQueue) CancelWorkers(pid int64) {
 | 
						|
	q.mutex.Lock()
 | 
						|
	pw, ok := q.PoolWorkers[pid]
 | 
						|
	q.mutex.Unlock()
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	pw.Cancel()
 | 
						|
}
 | 
						|
 | 
						|
// RemoveWorkers deletes pooled workers with pid
 | 
						|
func (q *ManagedQueue) RemoveWorkers(pid int64) {
 | 
						|
	q.mutex.Lock()
 | 
						|
	pw, ok := q.PoolWorkers[pid]
 | 
						|
	delete(q.PoolWorkers, pid)
 | 
						|
	q.mutex.Unlock()
 | 
						|
	if ok && pw.Cancel != nil {
 | 
						|
		pw.Cancel()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// AddWorkers adds workers to the queue if it has registered an add worker function
 | 
						|
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
 | 
						|
	if q.Pool != nil {
 | 
						|
		// the cancel will be added to the pool workers description above
 | 
						|
		return q.Pool.AddWorkers(number, timeout)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NumberOfWorkers returns the number of workers in the queue
 | 
						|
func (q *ManagedQueue) NumberOfWorkers() int {
 | 
						|
	if q.Pool != nil {
 | 
						|
		return q.Pool.NumberOfWorkers()
 | 
						|
	}
 | 
						|
	return -1
 | 
						|
}
 | 
						|
 | 
						|
// MaxNumberOfWorkers returns the maximum number of workers for the pool
 | 
						|
func (q *ManagedQueue) MaxNumberOfWorkers() int {
 | 
						|
	if q.Pool != nil {
 | 
						|
		return q.Pool.MaxNumberOfWorkers()
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// BoostWorkers returns the number of workers for a boost
 | 
						|
func (q *ManagedQueue) BoostWorkers() int {
 | 
						|
	if q.Pool != nil {
 | 
						|
		return q.Pool.BoostWorkers()
 | 
						|
	}
 | 
						|
	return -1
 | 
						|
}
 | 
						|
 | 
						|
// BoostTimeout returns the timeout of the next boost
 | 
						|
func (q *ManagedQueue) BoostTimeout() time.Duration {
 | 
						|
	if q.Pool != nil {
 | 
						|
		return q.Pool.BoostTimeout()
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// BlockTimeout returns the timeout til the next boost
 | 
						|
func (q *ManagedQueue) BlockTimeout() time.Duration {
 | 
						|
	if q.Pool != nil {
 | 
						|
		return q.Pool.BlockTimeout()
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// SetSettings sets the setable boost values
 | 
						|
func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
 | 
						|
	if q.Pool != nil {
 | 
						|
		q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (l ManagedQueueList) Len() int {
 | 
						|
	return len(l)
 | 
						|
}
 | 
						|
 | 
						|
func (l ManagedQueueList) Less(i, j int) bool {
 | 
						|
	return l[i].Name < l[j].Name
 | 
						|
}
 | 
						|
 | 
						|
func (l ManagedQueueList) Swap(i, j int) {
 | 
						|
	l[i], l[j] = l[j], l[i]
 | 
						|
}
 | 
						|
 | 
						|
func (l PoolWorkersList) Len() int {
 | 
						|
	return len(l)
 | 
						|
}
 | 
						|
 | 
						|
func (l PoolWorkersList) Less(i, j int) bool {
 | 
						|
	return l[i].Start.Before(l[j].Start)
 | 
						|
}
 | 
						|
 | 
						|
func (l PoolWorkersList) Swap(i, j int) {
 | 
						|
	l[i], l[j] = l[j], l[i]
 | 
						|
}
 |