mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-03 08:02:36 +09:00 
			
		
		
		
	
		
			
				
	
	
		
			411 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			411 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
package couchbase
 | 
						|
 | 
						|
import (
 | 
						|
	"crypto/tls"
 | 
						|
	"errors"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/couchbase/gomemcached"
 | 
						|
	"github.com/couchbase/gomemcached/client"
 | 
						|
	"github.com/couchbase/goutils/logging"
 | 
						|
)
 | 
						|
 | 
						|
// GenericMcdAuthHandler is a kind of AuthHandler that performs
 | 
						|
// special auth exchange (like non-standard auth, possibly followed by
 | 
						|
// select-bucket).
 | 
						|
type GenericMcdAuthHandler interface {
 | 
						|
	AuthHandler
 | 
						|
	AuthenticateMemcachedConn(host string, conn *memcached.Client) error
 | 
						|
}
 | 
						|
 | 
						|
// Error raised when a connection can't be retrieved from a pool.
 | 
						|
var TimeoutError = errors.New("timeout waiting to build connection")
 | 
						|
var errClosedPool = errors.New("the connection pool is closed")
 | 
						|
var errNoPool = errors.New("no connection pool")
 | 
						|
 | 
						|
// Default timeout for retrieving a connection from the pool.
 | 
						|
var ConnPoolTimeout = time.Hour * 24 * 30
 | 
						|
 | 
						|
// overflow connection closer cycle time
 | 
						|
var ConnCloserInterval = time.Second * 30
 | 
						|
 | 
						|
// ConnPoolAvailWaitTime is the amount of time to wait for an existing
 | 
						|
// connection from the pool before considering the creation of a new
 | 
						|
// one.
 | 
						|
var ConnPoolAvailWaitTime = time.Millisecond
 | 
						|
 | 
						|
type connectionPool struct {
 | 
						|
	host        string
 | 
						|
	mkConn      func(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error)
 | 
						|
	auth        AuthHandler
 | 
						|
	connections chan *memcached.Client
 | 
						|
	createsem   chan bool
 | 
						|
	bailOut     chan bool
 | 
						|
	poolSize    int
 | 
						|
	connCount   uint64
 | 
						|
	inUse       bool
 | 
						|
	tlsConfig   *tls.Config
 | 
						|
	bucket string
 | 
						|
}
 | 
						|
 | 
						|
func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int, tlsConfig *tls.Config, bucket string) *connectionPool {
 | 
						|
	connSize := poolSize
 | 
						|
	if closer {
 | 
						|
		connSize += poolOverflow
 | 
						|
	}
 | 
						|
	rv := &connectionPool{
 | 
						|
		host:        host,
 | 
						|
		connections: make(chan *memcached.Client, connSize),
 | 
						|
		createsem:   make(chan bool, poolSize+poolOverflow),
 | 
						|
		mkConn:      defaultMkConn,
 | 
						|
		auth:        ah,
 | 
						|
		poolSize:    poolSize,
 | 
						|
		tlsConfig:   tlsConfig,
 | 
						|
		bucket:      bucket,
 | 
						|
	}
 | 
						|
	if closer {
 | 
						|
		rv.bailOut = make(chan bool, 1)
 | 
						|
		go rv.connCloser()
 | 
						|
	}
 | 
						|
	return rv
 | 
						|
}
 | 
						|
 | 
						|
// ConnPoolTimeout is notified whenever connections are acquired from a pool.
 | 
						|
var ConnPoolCallback func(host string, source string, start time.Time, err error)
 | 
						|
 | 
						|
// Use regular in-the-clear connection if tlsConfig is nil.
 | 
						|
// Use secure connection (TLS) if tlsConfig is set.
 | 
						|
func defaultMkConn(host string, ah AuthHandler, tlsConfig *tls.Config, bucketName string) (*memcached.Client, error) {
 | 
						|
	var features memcached.Features
 | 
						|
 | 
						|
	var conn *memcached.Client
 | 
						|
	var err error
 | 
						|
	if tlsConfig == nil {
 | 
						|
		conn, err = memcached.Connect("tcp", host)
 | 
						|
	} else {
 | 
						|
		conn, err = memcached.ConnectTLS("tcp", host, tlsConfig)
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if TCPKeepalive == true {
 | 
						|
		conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second)
 | 
						|
	}
 | 
						|
 | 
						|
	if EnableMutationToken == true {
 | 
						|
		features = append(features, memcached.FeatureMutationToken)
 | 
						|
	}
 | 
						|
	if EnableDataType == true {
 | 
						|
		features = append(features, memcached.FeatureDataType)
 | 
						|
	}
 | 
						|
 | 
						|
	if EnableXattr == true {
 | 
						|
		features = append(features, memcached.FeatureXattr)
 | 
						|
	}
 | 
						|
 | 
						|
	if EnableCollections {
 | 
						|
		features = append(features, memcached.FeatureCollections)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(features) > 0 {
 | 
						|
		if DefaultTimeout > 0 {
 | 
						|
			conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
 | 
						|
		}
 | 
						|
 | 
						|
		res, err := conn.EnableFeatures(features)
 | 
						|
 | 
						|
		if DefaultTimeout > 0 {
 | 
						|
			conn.SetDeadline(noDeadline)
 | 
						|
		}
 | 
						|
 | 
						|
		if err != nil && isTimeoutError(err) {
 | 
						|
			conn.Close()
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		if err != nil || res.Status != gomemcached.SUCCESS {
 | 
						|
			logging.Warnf("Unable to enable features %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if gah, ok := ah.(GenericMcdAuthHandler); ok {
 | 
						|
		err = gah.AuthenticateMemcachedConn(host, conn)
 | 
						|
		if err != nil {
 | 
						|
			conn.Close()
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		return conn, nil
 | 
						|
	}
 | 
						|
	name, pass, bucket := ah.GetCredentials()
 | 
						|
	if bucket  == "" {
 | 
						|
		// Authenticator does not know specific bucket.
 | 
						|
		bucket = bucketName
 | 
						|
	}
 | 
						|
 | 
						|
	if name != "default" {
 | 
						|
		_, err = conn.Auth(name, pass)
 | 
						|
		if err != nil {
 | 
						|
			conn.Close()
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		// Select bucket (Required for cb_auth creds)
 | 
						|
		// Required when doing auth with _admin credentials
 | 
						|
		if bucket != "" && bucket != name {
 | 
						|
			_, err = conn.SelectBucket(bucket)
 | 
						|
			if err != nil {
 | 
						|
				conn.Close()
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return conn, nil
 | 
						|
}
 | 
						|
 | 
						|
func (cp *connectionPool) Close() (err error) {
 | 
						|
	defer func() {
 | 
						|
		if recover() != nil {
 | 
						|
			err = errors.New("connectionPool.Close error")
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	if cp.bailOut != nil {
 | 
						|
 | 
						|
		// defensively, we won't wait if the channel is full
 | 
						|
		select {
 | 
						|
		case cp.bailOut <- false:
 | 
						|
		default:
 | 
						|
		}
 | 
						|
	}
 | 
						|
	close(cp.connections)
 | 
						|
	for c := range cp.connections {
 | 
						|
		c.Close()
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (cp *connectionPool) Node() string {
 | 
						|
	return cp.host
 | 
						|
}
 | 
						|
 | 
						|
func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) {
 | 
						|
	if cp == nil {
 | 
						|
		return nil, errNoPool
 | 
						|
	}
 | 
						|
 | 
						|
	path := ""
 | 
						|
 | 
						|
	if ConnPoolCallback != nil {
 | 
						|
		defer func(path *string, start time.Time) {
 | 
						|
			ConnPoolCallback(cp.host, *path, start, err)
 | 
						|
		}(&path, time.Now())
 | 
						|
	}
 | 
						|
 | 
						|
	path = "short-circuit"
 | 
						|
 | 
						|
	// short-circuit available connetions.
 | 
						|
	select {
 | 
						|
	case rv, isopen := <-cp.connections:
 | 
						|
		if !isopen {
 | 
						|
			return nil, errClosedPool
 | 
						|
		}
 | 
						|
		atomic.AddUint64(&cp.connCount, 1)
 | 
						|
		return rv, nil
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	t := time.NewTimer(ConnPoolAvailWaitTime)
 | 
						|
	defer t.Stop()
 | 
						|
 | 
						|
	// Try to grab an available connection within 1ms
 | 
						|
	select {
 | 
						|
	case rv, isopen := <-cp.connections:
 | 
						|
		path = "avail1"
 | 
						|
		if !isopen {
 | 
						|
			return nil, errClosedPool
 | 
						|
		}
 | 
						|
		atomic.AddUint64(&cp.connCount, 1)
 | 
						|
		return rv, nil
 | 
						|
	case <-t.C:
 | 
						|
		// No connection came around in time, let's see
 | 
						|
		// whether we can get one or build a new one first.
 | 
						|
		t.Reset(d) // Reuse the timer for the full timeout.
 | 
						|
		select {
 | 
						|
		case rv, isopen := <-cp.connections:
 | 
						|
			path = "avail2"
 | 
						|
			if !isopen {
 | 
						|
				return nil, errClosedPool
 | 
						|
			}
 | 
						|
			atomic.AddUint64(&cp.connCount, 1)
 | 
						|
			return rv, nil
 | 
						|
		case cp.createsem <- true:
 | 
						|
			path = "create"
 | 
						|
			// Build a connection if we can't get a real one.
 | 
						|
			// This can potentially be an overflow connection, or
 | 
						|
			// a pooled connection.
 | 
						|
			rv, err := cp.mkConn(cp.host, cp.auth, cp.tlsConfig, cp.bucket)
 | 
						|
			if err != nil {
 | 
						|
				// On error, release our create hold
 | 
						|
				<-cp.createsem
 | 
						|
			} else {
 | 
						|
				atomic.AddUint64(&cp.connCount, 1)
 | 
						|
			}
 | 
						|
			return rv, err
 | 
						|
		case <-t.C:
 | 
						|
			return nil, ErrTimeout
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (cp *connectionPool) Get() (*memcached.Client, error) {
 | 
						|
	return cp.GetWithTimeout(ConnPoolTimeout)
 | 
						|
}
 | 
						|
 | 
						|
func (cp *connectionPool) Return(c *memcached.Client) {
 | 
						|
	if c == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if cp == nil {
 | 
						|
		c.Close()
 | 
						|
	}
 | 
						|
 | 
						|
	if c.IsHealthy() {
 | 
						|
		defer func() {
 | 
						|
			if recover() != nil {
 | 
						|
				// This happens when the pool has already been
 | 
						|
				// closed and we're trying to return a
 | 
						|
				// connection to it anyway.  Just close the
 | 
						|
				// connection.
 | 
						|
				c.Close()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
 | 
						|
		select {
 | 
						|
		case cp.connections <- c:
 | 
						|
		default:
 | 
						|
			<-cp.createsem
 | 
						|
			c.Close()
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		<-cp.createsem
 | 
						|
		c.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// give the ability to discard a connection from a pool
 | 
						|
// useful for ditching connections to the wrong node after a rebalance
 | 
						|
func (cp *connectionPool) Discard(c *memcached.Client) {
 | 
						|
	<-cp.createsem
 | 
						|
	c.Close()
 | 
						|
}
 | 
						|
 | 
						|
// asynchronous connection closer
 | 
						|
func (cp *connectionPool) connCloser() {
 | 
						|
	var connCount uint64
 | 
						|
 | 
						|
	t := time.NewTimer(ConnCloserInterval)
 | 
						|
	defer t.Stop()
 | 
						|
 | 
						|
	for {
 | 
						|
		connCount = cp.connCount
 | 
						|
 | 
						|
		// we don't exist anymore! bail out!
 | 
						|
		select {
 | 
						|
		case <-cp.bailOut:
 | 
						|
			return
 | 
						|
		case <-t.C:
 | 
						|
		}
 | 
						|
		t.Reset(ConnCloserInterval)
 | 
						|
 | 
						|
		// no overflow connections open or sustained requests for connections
 | 
						|
		// nothing to do until the next cycle
 | 
						|
		if len(cp.connections) <= cp.poolSize ||
 | 
						|
			ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// close overflow connections now that they are not needed
 | 
						|
		for c := range cp.connections {
 | 
						|
			select {
 | 
						|
			case <-cp.bailOut:
 | 
						|
				return
 | 
						|
			default:
 | 
						|
			}
 | 
						|
 | 
						|
			// bail out if close did not work out
 | 
						|
			if !cp.connCleanup(c) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if len(cp.connections) <= cp.poolSize {
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// close connection with recovery on error
 | 
						|
func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) {
 | 
						|
 | 
						|
	// just in case we are closing a connection after
 | 
						|
	// bailOut has been sent but we haven't yet read it
 | 
						|
	defer func() {
 | 
						|
		if recover() != nil {
 | 
						|
			rv = false
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	rv = true
 | 
						|
 | 
						|
	c.Close()
 | 
						|
	<-cp.createsem
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) {
 | 
						|
	if cp == nil {
 | 
						|
		return nil, errNoPool
 | 
						|
	}
 | 
						|
	mc, err := cp.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// A connection can't be used after TAP; Dont' count it against the
 | 
						|
	// connection pool capacity
 | 
						|
	<-cp.createsem
 | 
						|
 | 
						|
	return mc.StartTapFeed(*args)
 | 
						|
}
 | 
						|
 | 
						|
const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
 | 
						|
 | 
						|
func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) {
 | 
						|
	if cp == nil {
 | 
						|
		return nil, errNoPool
 | 
						|
	}
 | 
						|
	mc, err := cp.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// A connection can't be used after it has been allocated to UPR;
 | 
						|
	// Dont' count it against the connection pool capacity
 | 
						|
	<-cp.createsem
 | 
						|
 | 
						|
	uf, err := mc.NewUprFeed()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if err := uf.StartFeedWithConfig(data_chan_size); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return uf, nil
 | 
						|
}
 |