mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-29 10:57:44 +09:00 
			
		
		
		
	* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
		
			
				
	
	
		
			393 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			393 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2015 PingCAP, Inc.
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package ddl
 | |
| 
 | |
| import (
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/juju/errors"
 | |
| 	"github.com/ngaut/log"
 | |
| 	"github.com/pingcap/tidb/context"
 | |
| 	"github.com/pingcap/tidb/kv"
 | |
| 	"github.com/pingcap/tidb/meta"
 | |
| 	"github.com/pingcap/tidb/model"
 | |
| 	"github.com/pingcap/tidb/terror"
 | |
| )
 | |
| 
 | |
| func (d *ddl) startDDLJob(ctx context.Context, job *model.Job) error {
 | |
| 	// for every DDL, we must commit current transaction.
 | |
| 	if err := ctx.FinishTxn(false); err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	// Create a new job and queue it.
 | |
| 	err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
 | |
| 		t := meta.NewMeta(txn)
 | |
| 		var err error
 | |
| 		job.ID, err = t.GenGlobalID()
 | |
| 		if err != nil {
 | |
| 			return errors.Trace(err)
 | |
| 		}
 | |
| 
 | |
| 		err = t.EnQueueDDLJob(job)
 | |
| 		return errors.Trace(err)
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	// notice worker that we push a new job and wait the job done.
 | |
| 	asyncNotify(d.ddlJobCh)
 | |
| 
 | |
| 	log.Warnf("[ddl] start DDL job %v", job)
 | |
| 
 | |
| 	jobID := job.ID
 | |
| 
 | |
| 	var historyJob *model.Job
 | |
| 
 | |
| 	// for a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
 | |
| 	// for every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
 | |
| 	ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
 | |
| 	defer ticker.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-d.ddlJobDoneCh:
 | |
| 		case <-ticker.C:
 | |
| 		}
 | |
| 
 | |
| 		historyJob, err = d.getHistoryDDLJob(jobID)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("[ddl] get history DDL job err %v, check again", err)
 | |
| 			continue
 | |
| 		} else if historyJob == nil {
 | |
| 			log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// if a job is a history table, the state must be JobDone or JobCancel.
 | |
| 		if historyJob.State == model.JobDone {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		return errors.Errorf(historyJob.Error)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
 | |
| 	var job *model.Job
 | |
| 
 | |
| 	err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
 | |
| 		t := meta.NewMeta(txn)
 | |
| 		var err1 error
 | |
| 		job, err1 = t.GetHistoryDDLJob(id)
 | |
| 		return errors.Trace(err1)
 | |
| 	})
 | |
| 
 | |
| 	return job, errors.Trace(err)
 | |
| }
 | |
| 
 | |
| func asyncNotify(ch chan struct{}) {
 | |
| 	select {
 | |
| 	case ch <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
 | |
| 	var owner *model.Owner
 | |
| 	var err error
 | |
| 
 | |
| 	switch flag {
 | |
| 	case ddlJobFlag:
 | |
| 		owner, err = t.GetDDLJobOwner()
 | |
| 	case bgJobFlag:
 | |
| 		owner, err = t.GetBgJobOwner()
 | |
| 	default:
 | |
| 		err = errInvalidJobFlag
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	if owner == nil {
 | |
| 		owner = &model.Owner{}
 | |
| 		// try to set onwer
 | |
| 		owner.OwnerID = d.uuid
 | |
| 	}
 | |
| 
 | |
| 	now := time.Now().UnixNano()
 | |
| 	// we must wait 2 * lease time to guarantee other servers update the schema,
 | |
| 	// the owner will update its owner status every 2 * lease time, so here we use
 | |
| 	// 4 * lease to check its timeout.
 | |
| 	maxTimeout := int64(4 * d.lease)
 | |
| 	if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout {
 | |
| 		owner.OwnerID = d.uuid
 | |
| 		owner.LastUpdateTS = now
 | |
| 		// update status.
 | |
| 		switch flag {
 | |
| 		case ddlJobFlag:
 | |
| 			err = t.SetDDLJobOwner(owner)
 | |
| 		case bgJobFlag:
 | |
| 			err = t.SetBgJobOwner(owner)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Trace(err)
 | |
| 		}
 | |
| 		log.Debugf("[ddl] become %s job owner %s", flag, owner.OwnerID)
 | |
| 	}
 | |
| 
 | |
| 	if owner.OwnerID != d.uuid {
 | |
| 		log.Debugf("[ddl] not %s job owner, owner is %s", flag, owner.OwnerID)
 | |
| 		return nil, errors.Trace(ErrNotOwner)
 | |
| 	}
 | |
| 
 | |
| 	return owner, nil
 | |
| }
 | |
| 
 | |
| func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) {
 | |
| 	job, err := t.GetDDLJob(0)
 | |
| 	return job, errors.Trace(err)
 | |
| }
 | |
| 
 | |
| // every time we enter another state except final state, we must call this function.
 | |
| func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job) error {
 | |
| 	err := t.UpdateDDLJob(0, job)
 | |
| 	return errors.Trace(err)
 | |
| }
 | |
| 
 | |
| func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error {
 | |
| 	log.Warnf("[ddl] finish DDL job %v", job)
 | |
| 	// done, notice and run next job.
 | |
| 	_, err := t.DeQueueDDLJob()
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 	switch job.Type {
 | |
| 	case model.ActionDropSchema, model.ActionDropTable:
 | |
| 		if err = d.prepareBgJob(job); err != nil {
 | |
| 			return errors.Trace(err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	err = t.AddHistoryDDLJob(job)
 | |
| 	return errors.Trace(err)
 | |
| }
 | |
| 
 | |
| // ErrNotOwner means we are not owner and can't handle DDL jobs.
 | |
| var ErrNotOwner = errors.New("DDL: not owner")
 | |
| 
 | |
| // ErrWorkerClosed means we have already closed the DDL worker.
 | |
| var ErrWorkerClosed = errors.New("DDL: worker is closed")
 | |
| 
 | |
| var errInvalidJobFlag = errors.New("DDL: invalid job flag")
 | |
| 
 | |
| // JobType is job type, including ddl/background.
 | |
| type JobType int
 | |
| 
 | |
| const (
 | |
| 	ddlJobFlag = iota + 1
 | |
| 	bgJobFlag
 | |
| )
 | |
| 
 | |
| func (j JobType) String() string {
 | |
| 	switch j {
 | |
| 	case ddlJobFlag:
 | |
| 		return "ddl"
 | |
| 	case bgJobFlag:
 | |
| 		return "background"
 | |
| 	}
 | |
| 
 | |
| 	return "unknown"
 | |
| }
 | |
| 
 | |
| func (d *ddl) handleDDLJobQueue() error {
 | |
| 	for {
 | |
| 		if d.isClosed() {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		waitTime := 2 * d.lease
 | |
| 
 | |
| 		var job *model.Job
 | |
| 		err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
 | |
| 			t := meta.NewMeta(txn)
 | |
| 			owner, err := d.checkOwner(t, ddlJobFlag)
 | |
| 			if terror.ErrorEqual(err, ErrNotOwner) {
 | |
| 				// we are not owner, return and retry checking later.
 | |
| 				return nil
 | |
| 			} else if err != nil {
 | |
| 				return errors.Trace(err)
 | |
| 			}
 | |
| 
 | |
| 			// become the owner
 | |
| 			// get the first job and run
 | |
| 			job, err = d.getFirstDDLJob(t)
 | |
| 			if job == nil || err != nil {
 | |
| 				return errors.Trace(err)
 | |
| 			}
 | |
| 
 | |
| 			if job.IsRunning() {
 | |
| 				// if we enter a new state, crash when waiting 2 * lease time, and restart quickly,
 | |
| 				// we may run the job immediately again, but we don't wait enough 2 * lease time to
 | |
| 				// let other servers update the schema.
 | |
| 				// so here we must check the elapsed time from last update, if < 2 * lease, we must
 | |
| 				// wait again.
 | |
| 				elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS)
 | |
| 				if elapsed > 0 && elapsed < waitTime {
 | |
| 					log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
 | |
| 					waitTime -= elapsed
 | |
| 					return nil
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			log.Warnf("[ddl] run DDL job %v", job)
 | |
| 
 | |
| 			d.hook.OnJobRunBefore(job)
 | |
| 
 | |
| 			// if run job meets error, we will save this error in job Error
 | |
| 			// and retry later if the job is not cancelled.
 | |
| 			d.runDDLJob(t, job)
 | |
| 
 | |
| 			if job.IsFinished() {
 | |
| 				err = d.finishDDLJob(t, job)
 | |
| 			} else {
 | |
| 				err = d.updateDDLJob(t, job)
 | |
| 			}
 | |
| 			if err != nil {
 | |
| 				return errors.Trace(err)
 | |
| 			}
 | |
| 
 | |
| 			// running job may cost some time, so here we must update owner status to
 | |
| 			// prevent other become the owner.
 | |
| 			owner.LastUpdateTS = time.Now().UnixNano()
 | |
| 			err = t.SetDDLJobOwner(owner)
 | |
| 
 | |
| 			return errors.Trace(err)
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			return errors.Trace(err)
 | |
| 		} else if job == nil {
 | |
| 			// no job now, return and retry get later.
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		d.hook.OnJobUpdated(job)
 | |
| 
 | |
| 		// here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
 | |
| 		// if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
 | |
| 		// the newest schema.
 | |
| 		if job.State == model.JobRunning || job.State == model.JobDone {
 | |
| 			d.waitSchemaChanged(waitTime)
 | |
| 		}
 | |
| 
 | |
| 		if job.IsFinished() {
 | |
| 			d.startBgJob(job.Type)
 | |
| 			asyncNotify(d.ddlJobDoneCh)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
 | |
| 	if n1 > 0 {
 | |
| 		return n1
 | |
| 	}
 | |
| 
 | |
| 	return n2
 | |
| }
 | |
| 
 | |
| // onDDLWorker is for async online schema change, it will try to become the owner first,
 | |
| // then wait or pull the job queue to handle a schema change job.
 | |
| func (d *ddl) onDDLWorker() {
 | |
| 	defer d.wait.Done()
 | |
| 
 | |
| 	// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
 | |
| 	// every 2 * lease time, if lease is 0, we will use default 10s.
 | |
| 	checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
 | |
| 
 | |
| 	ticker := time.NewTicker(checkTime)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ticker.C:
 | |
| 			log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
 | |
| 		case <-d.ddlJobCh:
 | |
| 		case <-d.quitCh:
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		err := d.handleDDLJobQueue()
 | |
| 		if err != nil {
 | |
| 			log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) {
 | |
| 	if job.IsFinished() {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	job.State = model.JobRunning
 | |
| 
 | |
| 	var err error
 | |
| 	switch job.Type {
 | |
| 	case model.ActionCreateSchema:
 | |
| 		err = d.onCreateSchema(t, job)
 | |
| 	case model.ActionDropSchema:
 | |
| 		err = d.onDropSchema(t, job)
 | |
| 	case model.ActionCreateTable:
 | |
| 		err = d.onCreateTable(t, job)
 | |
| 	case model.ActionDropTable:
 | |
| 		err = d.onDropTable(t, job)
 | |
| 	case model.ActionAddColumn:
 | |
| 		err = d.onAddColumn(t, job)
 | |
| 	case model.ActionDropColumn:
 | |
| 		err = d.onDropColumn(t, job)
 | |
| 	case model.ActionAddIndex:
 | |
| 		err = d.onCreateIndex(t, job)
 | |
| 	case model.ActionDropIndex:
 | |
| 		err = d.onDropIndex(t, job)
 | |
| 	default:
 | |
| 		// invalid job, cancel it.
 | |
| 		job.State = model.JobCancelled
 | |
| 		err = errors.Errorf("invalid ddl job %v", job)
 | |
| 	}
 | |
| 
 | |
| 	// saves error in job, so that others can know error happens.
 | |
| 	if err != nil {
 | |
| 		// if job is not cancelled, we should log this error.
 | |
| 		if job.State != model.JobCancelled {
 | |
| 			log.Errorf("run ddl job err %v", errors.ErrorStack(err))
 | |
| 		}
 | |
| 
 | |
| 		job.Error = err.Error()
 | |
| 		job.ErrorCount++
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time
 | |
| // to guarantee that all servers have already updated schema.
 | |
| func (d *ddl) waitSchemaChanged(waitTime time.Duration) {
 | |
| 	if waitTime == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-time.After(waitTime):
 | |
| 	case <-d.quitCh:
 | |
| 	}
 | |
| }
 |