Upgrade bleve from v2.0.6 to v2.3.0 (#18132)

This commit is contained in:
Lunny Xiao 2022-01-01 16:26:27 +08:00 committed by GitHub
commit 25a290e320
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
70 changed files with 1283 additions and 660 deletions

View file

@ -304,7 +304,7 @@ func (o *Builder) Close() error {
}
// fill the root bolt with this fake index snapshot
_, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin)
_, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin, nil)
if err != nil {
_ = tx.Rollback()
_ = rootBolt.Close()

View file

@ -46,6 +46,17 @@ type epochWatcher struct {
}
func (s *Scorch) introducerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "introducer",
Path: s.path,
})
}
s.asyncTasks.Done()
}()
var epochWatchers []*epochWatcher
OUTER:
for {
@ -88,8 +99,6 @@ OUTER:
}
epochWatchers = epochWatchersNext
}
s.asyncTasks.Done()
}
func (s *Scorch) introduceSegment(next *segmentIntroduction) error {

View file

@ -29,12 +29,22 @@ import (
)
func (s *Scorch) mergerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "merger",
Path: s.path,
})
}
s.asyncTasks.Done()
}()
var lastEpochMergePlanned uint64
var ctrlMsg *mergerCtrl
mergePlannerOptions, err := s.parseMergePlannerOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
s.asyncTasks.Done()
return
}
ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
@ -130,8 +140,6 @@ OUTER:
atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1)
}
s.asyncTasks.Done()
}
type mergerCtrl struct {
@ -209,32 +217,32 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
}
type closeChWrapper struct {
ch1 chan struct{}
ctx context.Context
closeCh chan struct{}
ch1 chan struct{}
ctx context.Context
closeCh chan struct{}
cancelCh chan struct{}
}
func newCloseChWrapper(ch1 chan struct{},
ctx context.Context) *closeChWrapper {
return &closeChWrapper{ch1: ch1,
ctx: ctx,
closeCh: make(chan struct{})}
return &closeChWrapper{
ch1: ch1,
ctx: ctx,
closeCh: make(chan struct{}),
cancelCh: make(chan struct{}),
}
}
func (w *closeChWrapper) close() {
select {
case <-w.closeCh:
default:
close(w.closeCh)
}
close(w.closeCh)
}
func (w *closeChWrapper) listen() {
select {
case <-w.ch1:
w.close()
close(w.cancelCh)
case <-w.ctx.Done():
w.close()
close(w.cancelCh)
case <-w.closeCh:
}
}
@ -320,7 +328,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.closeCh, s)
cw.cancelCh, s)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))

View file

@ -19,6 +19,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
@ -82,7 +83,16 @@ type persisterOptions struct {
type notificationChan chan struct{}
func (s *Scorch) persisterLoop() {
defer s.asyncTasks.Done()
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "persister",
Path: s.path,
})
}
s.asyncTasks.Done()
}()
var persistWatchers []*epochWatcher
var lastPersistedEpoch, lastMergedEpoch uint64
@ -427,8 +437,59 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
return true, nil
}
func copyToDirectory(srcPath string, d index.Directory) (int64, error) {
if d == nil {
return 0, nil
}
dest, err := d.GetWriter(filepath.Join("store", filepath.Base(srcPath)))
if err != nil {
return 0, fmt.Errorf("GetWriter err: %v", err)
}
sourceFileStat, err := os.Stat(srcPath)
if err != nil {
return 0, err
}
if !sourceFileStat.Mode().IsRegular() {
return 0, fmt.Errorf("%s is not a regular file", srcPath)
}
source, err := os.Open(srcPath)
if err != nil {
return 0, err
}
defer source.Close()
defer dest.Close()
return io.Copy(dest, source)
}
func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory,
path string) error {
if d == nil {
return seg.Persist(path)
}
sg, ok := seg.(io.WriterTo)
if !ok {
return fmt.Errorf("no io.WriterTo segment implementation found")
}
w, err := d.GetWriter(filepath.Join("store", filepath.Base(path)))
if err != nil {
return err
}
_, err = sg.WriteTo(w)
w.Close()
return err
}
func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin SegmentPlugin) ([]string, map[uint64]string, error) {
segPlugin SegmentPlugin, d index.Directory) (
[]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return nil, nil, err
@ -481,7 +542,11 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
switch seg := segmentSnapshot.segment.(type) {
case segment.PersistedSegment:
segPath := seg.Path()
filename := strings.TrimPrefix(segPath, path+string(os.PathSeparator))
_, err = copyToDirectory(segPath, d)
if err != nil {
return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err)
}
filename := filepath.Base(segPath)
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return nil, nil, err
@ -490,10 +555,10 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
case segment.UnpersistedSegment:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := path + string(os.PathSeparator) + filename
err = seg.Persist(path)
path := filepath.Join(path, filename)
err := persistToDirectory(seg, d, path)
if err != nil {
return nil, nil, fmt.Errorf("error persisting segment: %v", err)
return nil, nil, fmt.Errorf("segment: %s persist err: %v", path, err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
@ -534,7 +599,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
}
}()
filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin)
filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, nil)
if err != nil {
return err
}

View file

@ -75,6 +75,16 @@ type Scorch struct {
segPlugin SegmentPlugin
}
// AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process
type AsyncPanicError struct {
Source string
Path string
}
func (e *AsyncPanicError) Error() string {
return fmt.Sprintf("%s panic when processing %s", e.Source, e.Path)
}
type internalStats struct {
persistEpoch uint64
persistSnapshotSize uint64
@ -202,6 +212,15 @@ func (s *Scorch) openBolt() error {
var rootBoltOpt = *bolt.DefaultOptions
if s.readOnly {
rootBoltOpt.ReadOnly = true
rootBoltOpt.OpenFile = func(path string, flag int, mode os.FileMode) (*os.File, error) {
// Bolt appends an O_CREATE flag regardless.
// See - https://github.com/etcd-io/bbolt/blob/v1.3.5/db.go#L210
// Use os.O_RDONLY only if path exists (#1623)
if _, err := os.Stat(path); os.IsNotExist(err) {
return os.OpenFile(path, flag, mode)
}
return os.OpenFile(path, os.O_RDONLY, mode)
}
} else {
if s.path != "" {
err := os.MkdirAll(s.path, 0700)

View file

@ -18,6 +18,8 @@ import (
"container/heap"
"encoding/binary"
"fmt"
"os"
"path/filepath"
"reflect"
"sort"
"sync"
@ -29,6 +31,7 @@ import (
segment "github.com/blevesearch/scorch_segment_api/v2"
"github.com/blevesearch/vellum"
lev "github.com/blevesearch/vellum/levenshtein"
bolt "go.etcd.io/bbolt"
)
// re usable, threadsafe levenshtein builders
@ -426,6 +429,8 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
rvd.AddField(document.NewTextField(name, arrayPos, value))
case 'n':
rvd.AddField(document.NewNumericFieldFromBytes(name, arrayPos, value))
case 'i':
rvd.AddField(document.NewIPFieldFromBytes(name, arrayPos, value))
case 'd':
rvd.AddField(document.NewDateTimeFieldFromBytes(name, arrayPos, value))
case 'b':
@ -762,3 +767,46 @@ OUTER:
}
return rv
}
func (i *IndexSnapshot) CopyTo(d index.Directory) error {
// get the root bolt file.
w, err := d.GetWriter(filepath.Join("store", "root.bolt"))
if err != nil || w == nil {
return fmt.Errorf("failed to create the root.bolt file, err: %v", err)
}
rootFile, ok := w.(*os.File)
if !ok {
return fmt.Errorf("invalid root.bolt file found")
}
copyBolt, err := bolt.Open(rootFile.Name(), 0600, nil)
if err != nil {
return err
}
defer func() {
w.Close()
if cerr := copyBolt.Close(); cerr != nil && err == nil {
err = cerr
}
}()
// start a write transaction
tx, err := copyBolt.Begin(true)
if err != nil {
return err
}
_, _, err = prepareBoltSnapshot(i, tx, "", i.parent.segPlugin, d)
if err != nil {
_ = tx.Rollback()
return fmt.Errorf("error backing up index snapshot: %v", err)
}
// commit bolt data
err = tx.Commit()
if err != nil {
return fmt.Errorf("error commit tx to backup root bolt: %v", err)
}
return copyBolt.Sync()
}

View file

@ -727,6 +727,8 @@ func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document
return document.NewBooleanFieldFromBytes(name, pos, value)
case 'g':
return document.NewGeoPointFieldFromBytes(name, pos, value)
case 'i':
return document.NewIPFieldFromBytes(name, pos, value)
}
return nil
}