mirror of
				https://codeberg.org/forgejo/forgejo.git
				synced 2025-11-04 08:21:11 +00:00 
			
		
		
		
	Although some features are mixed together in this PR, this PR is not
that large, and these features are all related.
Actually there are more than 70 lines are for a toy "test queue", so
this PR is quite simple.
Major features:
1. Allow site admin to clear a queue (remove all items in a queue)
* Because there is no transaction, the "unique queue" could be corrupted
in rare cases, that's unfixable.
* eg: the item is in the "set" but not in the "list", so the item would
never be able to be pushed into the queue.
* Now site admin could simply clear the queue, then everything becomes
correct, the lost items could be re-pushed into queue by future
operations.
3. Split the "admin/monitor" to separate pages
4. Allow to download diagnosis report
* In history, there were many users reporting that Gitea queue gets
stuck, or Gitea's CPU is 100%
    * With diagnosis report, maintainers could know what happens clearly
The diagnosis report sample:
[gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip)
, use "go tool pprof profile.dat" to view the report.
Screenshots:



---------
Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: Giteabot <teabot@gitea.io>
		
	
			
		
			
				
	
	
		
			113 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			113 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2019 The Gitea Authors. All rights reserved.
 | 
						|
// SPDX-License-Identifier: MIT
 | 
						|
 | 
						|
package queue
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"code.gitea.io/gitea/modules/log"
 | 
						|
	"code.gitea.io/gitea/modules/setting"
 | 
						|
)
 | 
						|
 | 
						|
// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
 | 
						|
type Manager struct {
 | 
						|
	mu sync.Mutex
 | 
						|
 | 
						|
	qidCounter int64
 | 
						|
	Queues     map[int64]ManagedWorkerPoolQueue
 | 
						|
}
 | 
						|
 | 
						|
type ManagedWorkerPoolQueue interface {
 | 
						|
	GetName() string
 | 
						|
	GetType() string
 | 
						|
	GetItemTypeName() string
 | 
						|
	GetWorkerNumber() int
 | 
						|
	GetWorkerActiveNumber() int
 | 
						|
	GetWorkerMaxNumber() int
 | 
						|
	SetWorkerMaxNumber(num int)
 | 
						|
	GetQueueItemNumber() int
 | 
						|
 | 
						|
	// FlushWithContext tries to make the handler process all items in the queue synchronously.
 | 
						|
	// It is for testing purpose only. It's not designed to be used in a cluster.
 | 
						|
	FlushWithContext(ctx context.Context, timeout time.Duration) error
 | 
						|
 | 
						|
	// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
 | 
						|
	RemoveAllItems(ctx context.Context) error
 | 
						|
}
 | 
						|
 | 
						|
var manager *Manager
 | 
						|
 | 
						|
func init() {
 | 
						|
	manager = &Manager{
 | 
						|
		Queues: make(map[int64]ManagedWorkerPoolQueue),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func GetManager() *Manager {
 | 
						|
	return manager
 | 
						|
}
 | 
						|
 | 
						|
func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
 | 
						|
	m.mu.Lock()
 | 
						|
	defer m.mu.Unlock()
 | 
						|
	m.qidCounter++
 | 
						|
	m.Queues[m.qidCounter] = managed
 | 
						|
}
 | 
						|
 | 
						|
func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
 | 
						|
	m.mu.Lock()
 | 
						|
	defer m.mu.Unlock()
 | 
						|
	return m.Queues[qid]
 | 
						|
}
 | 
						|
 | 
						|
func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
 | 
						|
	m.mu.Lock()
 | 
						|
	defer m.mu.Unlock()
 | 
						|
 | 
						|
	queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
 | 
						|
	for k, v := range m.Queues {
 | 
						|
		queues[k] = v
 | 
						|
	}
 | 
						|
	return queues
 | 
						|
}
 | 
						|
 | 
						|
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
 | 
						|
// It is for testing purpose only. It's not designed to be used in a cluster.
 | 
						|
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
 | 
						|
	var finalErr error
 | 
						|
	qs := m.ManagedQueues()
 | 
						|
	for _, q := range qs {
 | 
						|
		if err := q.FlushWithContext(ctx, timeout); err != nil {
 | 
						|
			finalErr = err // TODO: in Go 1.20: errors.Join
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return finalErr
 | 
						|
}
 | 
						|
 | 
						|
// CreateSimpleQueue creates a simple queue from global setting config provider by name
 | 
						|
func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
 | 
						|
	return createWorkerPoolQueue(name, setting.CfgProvider, handler, false)
 | 
						|
}
 | 
						|
 | 
						|
// CreateUniqueQueue creates a unique queue from global setting config provider by name
 | 
						|
func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
 | 
						|
	return createWorkerPoolQueue(name, setting.CfgProvider, handler, true)
 | 
						|
}
 | 
						|
 | 
						|
func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
 | 
						|
	queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
 | 
						|
	if err != nil {
 | 
						|
		log.Error("Failed to get queue settings for %q: %v", name, err)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	w, err := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique)
 | 
						|
	if err != nil {
 | 
						|
		log.Error("Failed to create queue %q: %v", name, err)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	GetManager().AddManagedQueue(w)
 | 
						|
	return w
 | 
						|
}
 |