mirror of
				https://codeberg.org/forgejo/forgejo.git
				synced 2025-10-22 10:02:29 +00:00 
			
		
		
		
	There have been a number of reports of PRs being blocked whilst being checked which have been difficult to debug. In investigating #23050 I have realised that whilst the Warn there is somewhat of a miscall there was a real bug in the way that the LevelUniqueQueue was being restored on start-up of the PersistableChannelUniqueQueue. Next there is a conflict in the setting of the internal leveldb queue name - This wasn't being set so it was being overridden by other unique queues. This PR fixes these bugs and adds a testcase. Thanks to @brechtvl for noticing the second issue. Fix #23050 and others --------- Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: techknowlogick <techknowlogick@gitea.io>
		
			
				
	
	
		
			259 lines
		
	
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			259 lines
		
	
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2023 The Gitea Authors. All rights reserved.
 | |
| // SPDX-License-Identifier: MIT
 | |
| 
 | |
| package queue
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/modules/log"
 | |
| 
 | |
| 	"github.com/stretchr/testify/assert"
 | |
| )
 | |
| 
 | |
| func TestPersistableChannelUniqueQueue(t *testing.T) {
 | |
| 	tmpDir := t.TempDir()
 | |
| 	fmt.Printf("TempDir %s\n", tmpDir)
 | |
| 	_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
 | |
| 
 | |
| 	// Common function to create the Queue
 | |
| 	newQueue := func(name string, handle func(data ...Data) []Data) Queue {
 | |
| 		q, err := NewPersistableChannelUniqueQueue(handle,
 | |
| 			PersistableChannelUniqueQueueConfiguration{
 | |
| 				Name:         name,
 | |
| 				DataDir:      tmpDir,
 | |
| 				QueueLength:  200,
 | |
| 				MaxWorkers:   1,
 | |
| 				BlockTimeout: 1 * time.Second,
 | |
| 				BoostTimeout: 5 * time.Minute,
 | |
| 				BoostWorkers: 1,
 | |
| 				Workers:      0,
 | |
| 			}, "task-0")
 | |
| 		assert.NoError(t, err)
 | |
| 		return q
 | |
| 	}
 | |
| 
 | |
| 	// runs the provided queue and provides some timer function
 | |
| 	type channels struct {
 | |
| 		readyForShutdown  chan struct{} // closed when shutdown functions have been assigned
 | |
| 		readyForTerminate chan struct{} // closed when terminate functions have been assigned
 | |
| 		signalShutdown    chan struct{} // Should close to signal shutdown
 | |
| 		doneShutdown      chan struct{} // closed when shutdown function is done
 | |
| 		queueTerminate    []func()      // list of atTerminate functions to call atTerminate - need to be accessed with lock
 | |
| 	}
 | |
| 	runQueue := func(q Queue, lock *sync.Mutex) *channels {
 | |
| 		chans := &channels{
 | |
| 			readyForShutdown:  make(chan struct{}),
 | |
| 			readyForTerminate: make(chan struct{}),
 | |
| 			signalShutdown:    make(chan struct{}),
 | |
| 			doneShutdown:      make(chan struct{}),
 | |
| 		}
 | |
| 		go q.Run(func(atShutdown func()) {
 | |
| 			go func() {
 | |
| 				lock.Lock()
 | |
| 				select {
 | |
| 				case <-chans.readyForShutdown:
 | |
| 				default:
 | |
| 					close(chans.readyForShutdown)
 | |
| 				}
 | |
| 				lock.Unlock()
 | |
| 				<-chans.signalShutdown
 | |
| 				atShutdown()
 | |
| 				close(chans.doneShutdown)
 | |
| 			}()
 | |
| 		}, func(atTerminate func()) {
 | |
| 			lock.Lock()
 | |
| 			defer lock.Unlock()
 | |
| 			select {
 | |
| 			case <-chans.readyForTerminate:
 | |
| 			default:
 | |
| 				close(chans.readyForTerminate)
 | |
| 			}
 | |
| 			chans.queueTerminate = append(chans.queueTerminate, atTerminate)
 | |
| 		})
 | |
| 
 | |
| 		return chans
 | |
| 	}
 | |
| 
 | |
| 	// call to shutdown and terminate the queue associated with the channels
 | |
| 	doTerminate := func(chans *channels, lock *sync.Mutex) {
 | |
| 		<-chans.readyForTerminate
 | |
| 
 | |
| 		lock.Lock()
 | |
| 		callbacks := []func(){}
 | |
| 		callbacks = append(callbacks, chans.queueTerminate...)
 | |
| 		lock.Unlock()
 | |
| 
 | |
| 		for _, callback := range callbacks {
 | |
| 			callback()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	mapLock := sync.Mutex{}
 | |
| 	executedInitial := map[string][]string{}
 | |
| 	hasInitial := map[string][]string{}
 | |
| 
 | |
| 	fillQueue := func(name string, done chan struct{}) {
 | |
| 		t.Run("Initial Filling: "+name, func(t *testing.T) {
 | |
| 			lock := sync.Mutex{}
 | |
| 
 | |
| 			startAt100Queued := make(chan struct{})
 | |
| 			stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
 | |
| 
 | |
| 			handle := func(data ...Data) []Data {
 | |
| 				<-startAt100Queued
 | |
| 				for _, datum := range data {
 | |
| 					s := datum.(string)
 | |
| 					mapLock.Lock()
 | |
| 					executedInitial[name] = append(executedInitial[name], s)
 | |
| 					mapLock.Unlock()
 | |
| 					if s == "task-20" {
 | |
| 						close(stopAt20Shutdown)
 | |
| 					}
 | |
| 				}
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			q := newQueue(name, handle)
 | |
| 
 | |
| 			// add 100 tasks to the queue
 | |
| 			for i := 0; i < 100; i++ {
 | |
| 				_ = q.Push("task-" + strconv.Itoa(i))
 | |
| 			}
 | |
| 			close(startAt100Queued)
 | |
| 
 | |
| 			chans := runQueue(q, &lock)
 | |
| 
 | |
| 			<-chans.readyForShutdown
 | |
| 			<-stopAt20Shutdown
 | |
| 			close(chans.signalShutdown)
 | |
| 			<-chans.doneShutdown
 | |
| 			_ = q.Push("final")
 | |
| 
 | |
| 			// check which tasks are still in the queue
 | |
| 			for i := 0; i < 100; i++ {
 | |
| 				if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
 | |
| 					mapLock.Lock()
 | |
| 					hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i))
 | |
| 					mapLock.Unlock()
 | |
| 				}
 | |
| 			}
 | |
| 			if has, _ := q.(UniqueQueue).Has("final"); has {
 | |
| 				mapLock.Lock()
 | |
| 				hasInitial[name] = append(hasInitial[name], "final")
 | |
| 				mapLock.Unlock()
 | |
| 			} else {
 | |
| 				assert.Fail(t, "UnqueQueue %s should have \"final\"", name)
 | |
| 			}
 | |
| 			doTerminate(chans, &lock)
 | |
| 			mapLock.Lock()
 | |
| 			assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
 | |
| 			mapLock.Unlock()
 | |
| 		})
 | |
| 		close(done)
 | |
| 	}
 | |
| 
 | |
| 	doneA := make(chan struct{})
 | |
| 	doneB := make(chan struct{})
 | |
| 
 | |
| 	go fillQueue("QueueA", doneA)
 | |
| 	go fillQueue("QueueB", doneB)
 | |
| 
 | |
| 	<-doneA
 | |
| 	<-doneB
 | |
| 
 | |
| 	executedEmpty := map[string][]string{}
 | |
| 	hasEmpty := map[string][]string{}
 | |
| 	emptyQueue := func(name string, done chan struct{}) {
 | |
| 		t.Run("Empty Queue: "+name, func(t *testing.T) {
 | |
| 			lock := sync.Mutex{}
 | |
| 			stop := make(chan struct{})
 | |
| 
 | |
| 			// collect the tasks that have been executed
 | |
| 			handle := func(data ...Data) []Data {
 | |
| 				lock.Lock()
 | |
| 				for _, datum := range data {
 | |
| 					mapLock.Lock()
 | |
| 					executedEmpty[name] = append(executedEmpty[name], datum.(string))
 | |
| 					mapLock.Unlock()
 | |
| 					if datum.(string) == "final" {
 | |
| 						close(stop)
 | |
| 					}
 | |
| 				}
 | |
| 				lock.Unlock()
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			q := newQueue(name, handle)
 | |
| 			chans := runQueue(q, &lock)
 | |
| 
 | |
| 			<-chans.readyForShutdown
 | |
| 			<-stop
 | |
| 			close(chans.signalShutdown)
 | |
| 			<-chans.doneShutdown
 | |
| 
 | |
| 			// check which tasks are still in the queue
 | |
| 			for i := 0; i < 100; i++ {
 | |
| 				if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
 | |
| 					mapLock.Lock()
 | |
| 					hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i))
 | |
| 					mapLock.Unlock()
 | |
| 				}
 | |
| 			}
 | |
| 			doTerminate(chans, &lock)
 | |
| 
 | |
| 			mapLock.Lock()
 | |
| 			assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name]))
 | |
| 			assert.Equal(t, 0, len(hasEmpty[name]))
 | |
| 			mapLock.Unlock()
 | |
| 		})
 | |
| 		close(done)
 | |
| 	}
 | |
| 
 | |
| 	doneA = make(chan struct{})
 | |
| 	doneB = make(chan struct{})
 | |
| 
 | |
| 	go emptyQueue("QueueA", doneA)
 | |
| 	go emptyQueue("QueueB", doneB)
 | |
| 
 | |
| 	<-doneA
 | |
| 	<-doneB
 | |
| 
 | |
| 	mapLock.Lock()
 | |
| 	t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
 | |
| 		len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
 | |
| 
 | |
| 	// reset and rerun
 | |
| 	executedInitial = map[string][]string{}
 | |
| 	hasInitial = map[string][]string{}
 | |
| 	executedEmpty = map[string][]string{}
 | |
| 	hasEmpty = map[string][]string{}
 | |
| 	mapLock.Unlock()
 | |
| 
 | |
| 	doneA = make(chan struct{})
 | |
| 	doneB = make(chan struct{})
 | |
| 
 | |
| 	go fillQueue("QueueA", doneA)
 | |
| 	go fillQueue("QueueB", doneB)
 | |
| 
 | |
| 	<-doneA
 | |
| 	<-doneB
 | |
| 
 | |
| 	doneA = make(chan struct{})
 | |
| 	doneB = make(chan struct{})
 | |
| 
 | |
| 	go emptyQueue("QueueA", doneA)
 | |
| 	go emptyQueue("QueueB", doneB)
 | |
| 
 | |
| 	<-doneA
 | |
| 	<-doneB
 | |
| 
 | |
| 	mapLock.Lock()
 | |
| 	t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
 | |
| 		len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
 | |
| 	mapLock.Unlock()
 | |
| }
 |