mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-10-24 19:12:24 +00:00
gitea.com experienced the corrupted LevelQueue bug again. I think the problem is clear now: if the keys in LevelDB went out-of-sync, the LevelQueue itself doesn't have the ability to recover, eg: * LevelQueue.Len() reports 100 * LevelQueue.LPop() reports ErrNotFound = errors.New("no key found") So it needs to dive into the LevelDB to remove all keys to recover the corrupted LevelQueue. More comments are in TestCorruptedLevelQueue.
93 lines
2.4 KiB
Go
93 lines
2.4 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/nosql"
|
|
|
|
"gitea.com/lunny/levelqueue"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
)
|
|
|
|
// baseLevelQueuePushPoper is the common interface for levelqueue.Queue and levelqueue.UniqueQueue
|
|
type baseLevelQueuePushPoper interface {
|
|
RPush(data []byte) error
|
|
LPop() ([]byte, error)
|
|
Len() int64
|
|
}
|
|
|
|
type baseLevelQueueCommonImpl struct {
|
|
length int
|
|
internalFunc func() baseLevelQueuePushPoper
|
|
mu *sync.Mutex
|
|
}
|
|
|
|
func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error {
|
|
return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
|
|
if q.mu != nil {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
}
|
|
|
|
cnt := int(q.internalFunc().Len())
|
|
if cnt >= q.length {
|
|
return true, nil
|
|
}
|
|
retry, err = false, q.internalFunc().RPush(data)
|
|
if err == levelqueue.ErrAlreadyInQueue {
|
|
err = ErrAlreadyInQueue
|
|
}
|
|
return retry, err
|
|
})
|
|
}
|
|
|
|
func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error) {
|
|
return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
|
|
if q.mu != nil {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
}
|
|
|
|
data, err = q.internalFunc().LPop()
|
|
if err == levelqueue.ErrNotFound {
|
|
return true, nil, nil
|
|
}
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
return false, data, nil
|
|
})
|
|
}
|
|
|
|
func baseLevelQueueCommon(cfg *BaseConfig, mu *sync.Mutex, internalFunc func() baseLevelQueuePushPoper) *baseLevelQueueCommonImpl {
|
|
return &baseLevelQueueCommonImpl{length: cfg.Length, mu: mu, internalFunc: internalFunc}
|
|
}
|
|
|
|
func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
|
|
if cfg.ConnStr == "" { // use data dir as conn str
|
|
if !filepath.IsAbs(cfg.DataFullDir) {
|
|
return "", nil, fmt.Errorf("invalid leveldb data dir (not absolute): %q", cfg.DataFullDir)
|
|
}
|
|
conn = cfg.DataFullDir
|
|
} else {
|
|
if !strings.HasPrefix(cfg.ConnStr, "leveldb://") {
|
|
return "", nil, fmt.Errorf("invalid leveldb connection string: %q", cfg.ConnStr)
|
|
}
|
|
conn = cfg.ConnStr
|
|
}
|
|
for i := 0; i < 10; i++ {
|
|
if db, err = nosql.GetManager().GetLevelDB(conn); err == nil {
|
|
break
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
return conn, db, err
|
|
}
|