You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			358 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
			
		
		
	
	
			358 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
| // MIT License
 | |
| 
 | |
| // Copyright (c) 2018 Andy Pan
 | |
| 
 | |
| // Permission is hereby granted, free of charge, to any person obtaining a copy
 | |
| // of this software and associated documentation files (the "Software"), to deal
 | |
| // in the Software without restriction, including without limitation the rights
 | |
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 | |
| // copies of the Software, and to permit persons to whom the Software is
 | |
| // furnished to do so, subject to the following conditions:
 | |
| //
 | |
| // The above copyright notice and this permission notice shall be included in all
 | |
| // copies or substantial portions of the Software.
 | |
| //
 | |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 | |
| // SOFTWARE.
 | |
| 
 | |
| package ants
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/panjf2000/ants/v2/internal"
 | |
| )
 | |
| 
 | |
| // Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
 | |
| type Pool struct {
 | |
| 	// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
 | |
| 	// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
 | |
| 	// which submits a new task to the same pool.
 | |
| 	capacity int32
 | |
| 
 | |
| 	// running is the number of the currently running goroutines.
 | |
| 	running int32
 | |
| 
 | |
| 	// lock for protecting the worker queue.
 | |
| 	lock sync.Locker
 | |
| 
 | |
| 	// workers is a slice that store the available workers.
 | |
| 	workers workerArray
 | |
| 
 | |
| 	// state is used to notice the pool to closed itself.
 | |
| 	state int32
 | |
| 
 | |
| 	// cond for waiting to get an idle worker.
 | |
| 	cond *sync.Cond
 | |
| 
 | |
| 	// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
 | |
| 	workerCache sync.Pool
 | |
| 
 | |
| 	// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
 | |
| 	waiting int32
 | |
| 
 | |
| 	heartbeatDone int32
 | |
| 	stopHeartbeat context.CancelFunc
 | |
| 
 | |
| 	options *Options
 | |
| }
 | |
| 
 | |
| // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
 | |
| func (p *Pool) purgePeriodically(ctx context.Context) {
 | |
| 	heartbeat := time.NewTicker(p.options.ExpiryDuration)
 | |
| 	defer func() {
 | |
| 		heartbeat.Stop()
 | |
| 		atomic.StoreInt32(&p.heartbeatDone, 1)
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-heartbeat.C:
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		if p.IsClosed() {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		p.lock.Lock()
 | |
| 		expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
 | |
| 		p.lock.Unlock()
 | |
| 
 | |
| 		// Notify obsolete workers to stop.
 | |
| 		// This notification must be outside the p.lock, since w.task
 | |
| 		// may be blocking and may consume a lot of time if many workers
 | |
| 		// are located on non-local CPUs.
 | |
| 		for i := range expiredWorkers {
 | |
| 			expiredWorkers[i].task <- nil
 | |
| 			expiredWorkers[i] = nil
 | |
| 		}
 | |
| 
 | |
| 		// There might be a situation where all workers have been cleaned up(no worker is running),
 | |
| 		// or another case where the pool capacity has been Tuned up,
 | |
| 		// while some invokers still get stuck in "p.cond.Wait()",
 | |
| 		// then it ought to wake all those invokers.
 | |
| 		if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
 | |
| 			p.cond.Broadcast()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewPool generates an instance of ants pool.
 | |
| func NewPool(size int, options ...Option) (*Pool, error) {
 | |
| 	opts := loadOptions(options...)
 | |
| 
 | |
| 	if size <= 0 {
 | |
| 		size = -1
 | |
| 	}
 | |
| 
 | |
| 	if expiry := opts.ExpiryDuration; expiry < 0 {
 | |
| 		return nil, ErrInvalidPoolExpiry
 | |
| 	} else if expiry == 0 {
 | |
| 		opts.ExpiryDuration = DefaultCleanIntervalTime
 | |
| 	}
 | |
| 
 | |
| 	if opts.Logger == nil {
 | |
| 		opts.Logger = defaultLogger
 | |
| 	}
 | |
| 
 | |
| 	p := &Pool{
 | |
| 		capacity: int32(size),
 | |
| 		lock:     internal.NewSpinLock(),
 | |
| 		options:  opts,
 | |
| 	}
 | |
| 	p.workerCache.New = func() interface{} {
 | |
| 		return &goWorker{
 | |
| 			pool: p,
 | |
| 			task: make(chan func(), workerChanCap),
 | |
| 		}
 | |
| 	}
 | |
| 	if p.options.PreAlloc {
 | |
| 		if size == -1 {
 | |
| 			return nil, ErrInvalidPreAllocSize
 | |
| 		}
 | |
| 		p.workers = newWorkerArray(loopQueueType, size)
 | |
| 	} else {
 | |
| 		p.workers = newWorkerArray(stackType, 0)
 | |
| 	}
 | |
| 
 | |
| 	p.cond = sync.NewCond(p.lock)
 | |
| 
 | |
| 	// Start a goroutine to clean up expired workers periodically.
 | |
| 	var ctx context.Context
 | |
| 	ctx, p.stopHeartbeat = context.WithCancel(context.Background())
 | |
| 	go p.purgePeriodically(ctx)
 | |
| 
 | |
| 	return p, nil
 | |
| }
 | |
| 
 | |
| // ---------------------------------------------------------------------------
 | |
| 
 | |
| // Submit submits a task to this pool.
 | |
| //
 | |
| // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
 | |
| // but what calls for special attention is that you will get blocked with the latest
 | |
| // Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
 | |
| // you should instantiate a Pool with ants.WithNonblocking(true).
 | |
| func (p *Pool) Submit(task func()) error {
 | |
| 	if p.IsClosed() {
 | |
| 		return ErrPoolClosed
 | |
| 	}
 | |
| 	var w *goWorker
 | |
| 	if w = p.retrieveWorker(); w == nil {
 | |
| 		return ErrPoolOverload
 | |
| 	}
 | |
| 	w.task <- task
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Running returns the number of workers currently running.
 | |
| func (p *Pool) Running() int {
 | |
| 	return int(atomic.LoadInt32(&p.running))
 | |
| }
 | |
| 
 | |
| // Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
 | |
| func (p *Pool) Free() int {
 | |
| 	c := p.Cap()
 | |
| 	if c < 0 {
 | |
| 		return -1
 | |
| 	}
 | |
| 	return c - p.Running()
 | |
| }
 | |
| 
 | |
| // Waiting returns the number of tasks which are waiting be executed.
 | |
| func (p *Pool) Waiting() int {
 | |
| 	return int(atomic.LoadInt32(&p.waiting))
 | |
| }
 | |
| 
 | |
| // Cap returns the capacity of this pool.
 | |
| func (p *Pool) Cap() int {
 | |
| 	return int(atomic.LoadInt32(&p.capacity))
 | |
| }
 | |
| 
 | |
| // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
 | |
| func (p *Pool) Tune(size int) {
 | |
| 	capacity := p.Cap()
 | |
| 	if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
 | |
| 		return
 | |
| 	}
 | |
| 	atomic.StoreInt32(&p.capacity, int32(size))
 | |
| 	if size > capacity {
 | |
| 		if size-capacity == 1 {
 | |
| 			p.cond.Signal()
 | |
| 			return
 | |
| 		}
 | |
| 		p.cond.Broadcast()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // IsClosed indicates whether the pool is closed.
 | |
| func (p *Pool) IsClosed() bool {
 | |
| 	return atomic.LoadInt32(&p.state) == CLOSED
 | |
| }
 | |
| 
 | |
| // Release closes this pool and releases the worker queue.
 | |
| func (p *Pool) Release() {
 | |
| 	if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) {
 | |
| 		return
 | |
| 	}
 | |
| 	p.lock.Lock()
 | |
| 	p.workers.reset()
 | |
| 	p.lock.Unlock()
 | |
| 	// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
 | |
| 	// those callers blocking infinitely.
 | |
| 	p.cond.Broadcast()
 | |
| }
 | |
| 
 | |
| // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
 | |
| func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
 | |
| 	if p.IsClosed() || p.stopHeartbeat == nil {
 | |
| 		return ErrPoolClosed
 | |
| 	}
 | |
| 
 | |
| 	p.stopHeartbeat()
 | |
| 	p.stopHeartbeat = nil
 | |
| 	p.Release()
 | |
| 
 | |
| 	endTime := time.Now().Add(timeout)
 | |
| 	for time.Now().Before(endTime) {
 | |
| 		if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 {
 | |
| 			return nil
 | |
| 		}
 | |
| 		time.Sleep(10 * time.Millisecond)
 | |
| 	}
 | |
| 	return ErrTimeout
 | |
| }
 | |
| 
 | |
| // Reboot reboots a closed pool.
 | |
| func (p *Pool) Reboot() {
 | |
| 	if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
 | |
| 		atomic.StoreInt32(&p.heartbeatDone, 0)
 | |
| 		var ctx context.Context
 | |
| 		ctx, p.stopHeartbeat = context.WithCancel(context.Background())
 | |
| 		go p.purgePeriodically(ctx)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ---------------------------------------------------------------------------
 | |
| 
 | |
| func (p *Pool) addRunning(delta int) {
 | |
| 	atomic.AddInt32(&p.running, int32(delta))
 | |
| }
 | |
| 
 | |
| func (p *Pool) addWaiting(delta int) {
 | |
| 	atomic.AddInt32(&p.waiting, int32(delta))
 | |
| }
 | |
| 
 | |
| // retrieveWorker returns an available worker to run the tasks.
 | |
| func (p *Pool) retrieveWorker() (w *goWorker) {
 | |
| 	spawnWorker := func() {
 | |
| 		w = p.workerCache.Get().(*goWorker)
 | |
| 		w.run()
 | |
| 	}
 | |
| 
 | |
| 	p.lock.Lock()
 | |
| 
 | |
| 	w = p.workers.detach()
 | |
| 	if w != nil { // first try to fetch the worker from the queue
 | |
| 		p.lock.Unlock()
 | |
| 	} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
 | |
| 		// if the worker queue is empty and we don't run out of the pool capacity,
 | |
| 		// then just spawn a new worker goroutine.
 | |
| 		p.lock.Unlock()
 | |
| 		spawnWorker()
 | |
| 	} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
 | |
| 		if p.options.Nonblocking {
 | |
| 			p.lock.Unlock()
 | |
| 			return
 | |
| 		}
 | |
| 	retry:
 | |
| 		if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
 | |
| 			p.lock.Unlock()
 | |
| 			return
 | |
| 		}
 | |
| 		p.addWaiting(1)
 | |
| 		p.cond.Wait() // block and wait for an available worker
 | |
| 		p.addWaiting(-1)
 | |
| 
 | |
| 		if p.IsClosed() {
 | |
| 			p.lock.Unlock()
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		var nw int
 | |
| 		if nw = p.Running(); nw == 0 { // awakened by the scavenger
 | |
| 			p.lock.Unlock()
 | |
| 			spawnWorker()
 | |
| 			return
 | |
| 		}
 | |
| 		if w = p.workers.detach(); w == nil {
 | |
| 			if nw < p.Cap() {
 | |
| 				p.lock.Unlock()
 | |
| 				spawnWorker()
 | |
| 				return
 | |
| 			}
 | |
| 			goto retry
 | |
| 		}
 | |
| 		p.lock.Unlock()
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // revertWorker puts a worker back into free pool, recycling the goroutines.
 | |
| func (p *Pool) revertWorker(worker *goWorker) bool {
 | |
| 	if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
 | |
| 		p.cond.Broadcast()
 | |
| 		return false
 | |
| 	}
 | |
| 	worker.recycleTime = time.Now()
 | |
| 	p.lock.Lock()
 | |
| 
 | |
| 	// To avoid memory leaks, add a double check in the lock scope.
 | |
| 	// Issue: https://github.com/panjf2000/ants/issues/113
 | |
| 	if p.IsClosed() {
 | |
| 		p.lock.Unlock()
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	err := p.workers.insert(worker)
 | |
| 	if err != nil {
 | |
| 		p.lock.Unlock()
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
 | |
| 	p.cond.Signal()
 | |
| 	p.lock.Unlock()
 | |
| 	return true
 | |
| }
 |