1
0
Fork 0
golang-github-blevesearch-b.../index/scorch/merge.go
Daniel Baumann 982828099e
Adding upstream version 2.5.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-19 00:20:02 +02:00

637 lines
18 KiB
Go

// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"context"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring/v2"
"github.com/blevesearch/bleve/v2/index/scorch/mergeplan"
"github.com/blevesearch/bleve/v2/util"
segment "github.com/blevesearch/scorch_segment_api/v2"
)
func (s *Scorch) mergerLoop() {
defer func() {
if r := recover(); r != nil {
s.fireAsyncError(&AsyncPanicError{
Source: "merger",
Path: s.path,
})
}
s.asyncTasks.Done()
}()
var lastEpochMergePlanned uint64
var ctrlMsg *mergerCtrl
mergePlannerOptions, err := s.parseMergePlannerOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
return
}
ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
options: mergePlannerOptions,
doneCh: nil}
OUTER:
for {
atomic.AddUint64(&s.stats.TotFileMergeLoopBeg, 1)
select {
case <-s.closeCh:
break OUTER
default:
// check to see if there is a new snapshot to persist
s.rootLock.Lock()
ourSnapshot := s.root
ourSnapshot.AddRef()
atomic.StoreUint64(&s.iStats.mergeSnapshotSize, uint64(ourSnapshot.Size()))
atomic.StoreUint64(&s.iStats.mergeEpoch, ourSnapshot.epoch)
s.rootLock.Unlock()
if ctrlMsg == nil && ourSnapshot.epoch != lastEpochMergePlanned {
ctrlMsg = ctrlMsgDflt
}
if ctrlMsg != nil {
continueMerge := s.fireEvent(EventKindPreMergeCheck, 0)
// The default, if there's no handler, is to continue the merge.
if !continueMerge {
// If it's decided that this merge can't take place now,
// begin the merge process all over again.
// Retry instead of blocking/waiting here since a long wait
// can result in more segments introduced i.e. s.root will
// be updated.
// decrement the ref count since its no longer needed in this
// iteration
_ = ourSnapshot.DecRef()
continue OUTER
}
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options,
ourSnapshot)
if err != nil {
atomic.StoreUint64(&s.iStats.mergeEpoch, 0)
if err == segment.ErrClosed {
// index has been closed
_ = ourSnapshot.DecRef()
// continue the workloop on a user triggered cancel
if ctrlMsg.doneCh != nil {
close(ctrlMsg.doneCh)
ctrlMsg = nil
continue OUTER
}
// exit the workloop on index closure
ctrlMsg = nil
break OUTER
}
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
continue OUTER
}
if ctrlMsg.doneCh != nil {
close(ctrlMsg.doneCh)
}
ctrlMsg = nil
lastEpochMergePlanned = ourSnapshot.epoch
atomic.StoreUint64(&s.stats.LastMergedEpoch, ourSnapshot.epoch)
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
}
_ = ourSnapshot.DecRef()
// tell the persister we're waiting for changes
// first make a epochWatcher chan
ew := &epochWatcher{
epoch: lastEpochMergePlanned,
notifyCh: make(notificationChan, 1),
}
// give it to the persister
select {
case <-s.closeCh:
break OUTER
case s.persisterNotifier <- ew:
case ctrlMsg = <-s.forceMergeRequestCh:
continue OUTER
}
// now wait for persister (but also detect close)
select {
case <-s.closeCh:
break OUTER
case <-ew.notifyCh:
case ctrlMsg = <-s.forceMergeRequestCh:
}
}
atomic.AddUint64(&s.stats.TotFileMergeLoopEnd, 1)
}
}
type mergerCtrl struct {
ctx context.Context
options *mergeplan.MergePlanOptions
doneCh chan struct{}
}
// ForceMerge helps users trigger a merge operation on
// an online scorch index.
func (s *Scorch) ForceMerge(ctx context.Context,
mo *mergeplan.MergePlanOptions) error {
// check whether force merge is already under processing
s.rootLock.Lock()
if s.stats.TotFileMergeForceOpsStarted >
s.stats.TotFileMergeForceOpsCompleted {
s.rootLock.Unlock()
return fmt.Errorf("force merge already in progress")
}
s.stats.TotFileMergeForceOpsStarted++
s.rootLock.Unlock()
if mo != nil {
err := mergeplan.ValidateMergePlannerOptions(mo)
if err != nil {
return err
}
} else {
// assume the default single segment merge policy
mo = &mergeplan.SingleSegmentMergePlanOptions
}
msg := &mergerCtrl{options: mo,
doneCh: make(chan struct{}),
ctx: ctx,
}
// request the merger perform a force merge
select {
case s.forceMergeRequestCh <- msg:
case <-s.closeCh:
return nil
}
// wait for the force merge operation completion
select {
case <-msg.doneCh:
atomic.AddUint64(&s.stats.TotFileMergeForceOpsCompleted, 1)
case <-s.closeCh:
}
return nil
}
func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
error) {
mergePlannerOptions := mergeplan.DefaultMergePlanOptions
po, err := s.parsePersisterOptions()
if err != nil {
return nil, err
}
// by default use the MaxSizeInMemoryMergePerWorker from the persister option
// as the FloorSegmentFileSize for the merge planner which would be the
// first tier size in the planning. If the value is 0, then we don't use the
// file size in the planning.
mergePlannerOptions.FloorSegmentFileSize = int64(po.MaxSizeInMemoryMergePerWorker)
if v, ok := s.config["scorchMergePlanOptions"]; ok {
b, err := util.MarshalJSON(v)
if err != nil {
return &mergePlannerOptions, err
}
err = util.UnmarshalJSON(b, &mergePlannerOptions)
if err != nil {
return &mergePlannerOptions, err
}
err = mergeplan.ValidateMergePlannerOptions(&mergePlannerOptions)
if err != nil {
return nil, err
}
}
return &mergePlannerOptions, nil
}
type closeChWrapper struct {
ch1 chan struct{}
ctx context.Context
closeCh chan struct{}
cancelCh chan struct{}
}
func newCloseChWrapper(ch1 chan struct{},
ctx context.Context) *closeChWrapper {
return &closeChWrapper{
ch1: ch1,
ctx: ctx,
closeCh: make(chan struct{}),
cancelCh: make(chan struct{}),
}
}
func (w *closeChWrapper) close() {
close(w.closeCh)
}
func (w *closeChWrapper) listen() {
select {
case <-w.ch1:
close(w.cancelCh)
case <-w.ctx.Done():
close(w.cancelCh)
case <-w.closeCh:
}
}
func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
options *mergeplan.MergePlanOptions, ourSnapshot *IndexSnapshot) error {
// build list of persisted segments in this snapshot
var onlyPersistedSnapshots []mergeplan.Segment
for _, segmentSnapshot := range ourSnapshot.segment {
if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok {
onlyPersistedSnapshots = append(onlyPersistedSnapshots, segmentSnapshot)
}
}
atomic.AddUint64(&s.stats.TotFileMergePlan, 1)
// give this list to the planner
resultMergePlan, err := mergeplan.Plan(onlyPersistedSnapshots, options)
if err != nil {
atomic.AddUint64(&s.stats.TotFileMergePlanErr, 1)
return fmt.Errorf("merge planning err: %v", err)
}
if resultMergePlan == nil {
// nothing to do
atomic.AddUint64(&s.stats.TotFileMergePlanNone, 1)
return nil
}
atomic.AddUint64(&s.stats.TotFileMergePlanOk, 1)
atomic.AddUint64(&s.stats.TotFileMergePlanTasks, uint64(len(resultMergePlan.Tasks)))
// process tasks in serial for now
var filenames []string
cw := newCloseChWrapper(s.closeCh, ctx)
defer cw.close()
go cw.listen()
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
continue
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegments, uint64(len(task.Segments)))
oldMap := make(map[uint64]*SegmentSnapshot, len(task.Segments))
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]segment.Segment, 0, len(task.Segments))
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
mergedSegHistory := make(map[uint64]*mergedSegmentHistory, len(task.Segments))
for _, planSegment := range task.Segments {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
mergedSegHistory[segSnapshot.id] = &mergedSegmentHistory{
workerID: 0,
oldSegment: segSnapshot,
}
if persistedSeg, ok := segSnapshot.segment.(segment.PersistedSegment); ok {
if segSnapshot.LiveSize() == 0 {
atomic.AddUint64(&s.stats.TotFileMergeSegmentsEmpty, 1)
oldMap[segSnapshot.id] = nil
delete(mergedSegHistory, segSnapshot.id)
} else {
segmentsToMerge = append(segmentsToMerge, segSnapshot.segment)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
}
// track the files getting merged for unsetting the
// removal ineligibility. This helps to unflip files
// even with fast merger, slow persister work flows.
path := persistedSeg.Path()
filenames = append(filenames,
strings.TrimPrefix(path, s.path+string(os.PathSeparator)))
}
}
}
var seg segment.Segment
var filename string
if len(segmentsToMerge) > 0 {
filename = zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
fileMergeZapStartTime := time.Now()
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
prevBytesReadTotal := cumulateBytesRead(segmentsToMerge)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))
atomic.AddUint64(&s.stats.TotFileMergeZapTime, fileMergeZapTime)
if atomic.LoadUint64(&s.stats.MaxFileMergeZapTime) < fileMergeZapTime {
atomic.StoreUint64(&s.stats.MaxFileMergeZapTime, fileMergeZapTime)
}
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
if err == segment.ErrClosed {
return err
}
return fmt.Errorf("merging failed: %v", err)
}
seg, err = s.segPlugin.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return err
}
totalBytesRead := seg.BytesRead() + prevBytesReadTotal
seg.ResetBytesRead(totalBytesRead)
for i, segNewDocNums := range newDocNums {
if mergedSegHistory[task.Segments[i].Id()] != nil {
mergedSegHistory[task.Segments[i].Id()].oldNewDocIDs = segNewDocNums
}
}
atomic.AddUint64(&s.stats.TotFileMergeSegments, uint64(len(segmentsToMerge)))
}
sm := &segmentMerge{
id: []uint64{newSegmentID},
mergedSegHistory: mergedSegHistory,
new: []segment.Segment{seg},
newCount: seg.Count(),
notifyCh: make(chan *mergeTaskIntroStatus),
mmaped: 1,
}
s.fireEvent(EventKindMergeTaskIntroductionStart, 0)
// give it to the introducer
select {
case <-s.closeCh:
_ = seg.Close()
return segment.ErrClosed
case s.merges <- sm:
atomic.AddUint64(&s.stats.TotFileMergeIntroductions, 1)
}
introStartTime := time.Now()
// it is safe to blockingly wait for the merge introduction
// here as the introducer is bound to handle the notify channel.
introStatus := <-sm.notifyCh
introTime := uint64(time.Since(introStartTime))
atomic.AddUint64(&s.stats.TotFileMergeZapIntroductionTime, introTime)
if atomic.LoadUint64(&s.stats.MaxFileMergeZapIntroductionTime) < introTime {
atomic.StoreUint64(&s.stats.MaxFileMergeZapIntroductionTime, introTime)
}
atomic.AddUint64(&s.stats.TotFileMergeIntroductionsDone, 1)
if introStatus != nil && introStatus.indexSnapshot != nil {
_ = introStatus.indexSnapshot.DecRef()
if introStatus.skipped {
// close the segment on skipping introduction.
s.unmarkIneligibleForRemoval(filename)
_ = seg.Close()
}
}
atomic.AddUint64(&s.stats.TotFileMergePlanTasksDone, 1)
s.fireEvent(EventKindMergeTaskIntroduction, 0)
}
// once all the newly merged segment introductions are done,
// its safe to unflip the removal ineligibility for the replaced
// older segments
for _, f := range filenames {
s.unmarkIneligibleForRemoval(f)
}
return nil
}
type mergeTaskIntroStatus struct {
indexSnapshot *IndexSnapshot
skipped bool
}
// this is important when it comes to introducing multiple merged segments in a
// single introducer channel push. That way there is a check to ensure that the
// file count doesn't explode during the index's lifetime.
type mergedSegmentHistory struct {
workerID uint64
oldNewDocIDs []uint64
oldSegment *SegmentSnapshot
}
type segmentMerge struct {
id []uint64
new []segment.Segment
mergedSegHistory map[uint64]*mergedSegmentHistory
notifyCh chan *mergeTaskIntroStatus
mmaped uint32
newCount uint64
}
func cumulateBytesRead(sbs []segment.Segment) uint64 {
var rv uint64
for _, seg := range sbs {
rv += seg.BytesRead()
}
return rv
}
func closeNewMergedSegments(segs []segment.Segment) error {
for _, seg := range segs {
if seg != nil {
_ = seg.DecRef()
}
}
return nil
}
// mergeAndPersistInMemorySegments takes an IndexSnapshot and a list of in-memory segments,
// which are merged and persisted to disk concurrently. These are then introduced as
// the new root snapshot in one-shot.
func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot,
flushableObjs []*flushable) (*IndexSnapshot, []uint64, error) {
atomic.AddUint64(&s.stats.TotMemMergeBeg, 1)
memMergeZapStartTime := time.Now()
atomic.AddUint64(&s.stats.TotMemMergeZapBeg, 1)
var wg sync.WaitGroup
// we're tracking the merged segments and their doc number per worker
// to be able to introduce them all at once, so the first dimension of the
// slices here correspond to workerID
newDocIDsSet := make([][][]uint64, len(flushableObjs))
newMergedSegments := make([]segment.Segment, len(flushableObjs))
newMergedSegmentIDs := make([]uint64, len(flushableObjs))
numFlushes := len(flushableObjs)
var numSegments, newMergedCount uint64
var em sync.Mutex
var errs []error
// deploy the workers to merge and flush the batches of segments concurrently
// and create a new file segment
for i := 0; i < numFlushes; i++ {
wg.Add(1)
go func(segsBatch []segment.Segment, dropsBatch []*roaring.Bitmap, id int) {
defer wg.Done()
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
filename := zapFileName(newSegmentID)
path := s.path + string(os.PathSeparator) + filename
// the newly merged segment is already flushed out to disk, just needs
// to be opened using mmap.
newDocIDs, _, err :=
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
if err != nil {
em.Lock()
errs = append(errs, err)
em.Unlock()
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return
}
// to prevent accidental cleanup of this newly created file, mark it
// as ineligible for removal. this will be flipped back when the bolt
// is updated - which is valid, since the snapshot updated in bolt is
// cleaned up only if its zero ref'd (MB-66163 for more details)
s.markIneligibleForRemoval(filename)
newMergedSegmentIDs[id] = newSegmentID
newDocIDsSet[id] = newDocIDs
newMergedSegments[id], err = s.segPlugin.Open(path)
if err != nil {
em.Lock()
errs = append(errs, err)
em.Unlock()
atomic.AddUint64(&s.stats.TotMemMergeErr, 1)
return
}
atomic.AddUint64(&newMergedCount, newMergedSegments[id].Count())
atomic.AddUint64(&numSegments, uint64(len(segsBatch)))
}(flushableObjs[i].segments, flushableObjs[i].drops, i)
}
wg.Wait()
if errs != nil {
// close the new merged segments
_ = closeNewMergedSegments(newMergedSegments)
var errf error
for _, err := range errs {
if err == segment.ErrClosed {
// the index snapshot was closed which will be handled gracefully
// by retrying the whole merge+flush operation in a later iteration
// so its safe to early exit the same error.
return nil, nil, err
}
errf = fmt.Errorf("%w; %v", errf, err)
}
return nil, nil, errf
}
atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1)
memMergeZapTime := uint64(time.Since(memMergeZapStartTime))
atomic.AddUint64(&s.stats.TotMemMergeZapTime, memMergeZapTime)
if atomic.LoadUint64(&s.stats.MaxMemMergeZapTime) < memMergeZapTime {
atomic.StoreUint64(&s.stats.MaxMemMergeZapTime, memMergeZapTime)
}
// update the segmentMerge task with the newly merged + flushed segments which
// are to be introduced atomically.
sm := &segmentMerge{
id: newMergedSegmentIDs,
new: newMergedSegments,
mergedSegHistory: make(map[uint64]*mergedSegmentHistory, numSegments),
notifyCh: make(chan *mergeTaskIntroStatus),
newCount: newMergedCount,
}
// create a history map which maps the old in-memory segments with the specific
// persister worker (also the specific file segment its going to be part of)
// which flushed it out. This map will be used on the introducer side to out-ref
// the in-memory segments and also track the new tombstones if present.
for i, flushable := range flushableObjs {
for j, idx := range flushable.sbIdxs {
ss := snapshot.segment[idx]
// oldSegmentSnapshot.id -> {workerID, oldSegmentSnapshot, docIDs}
sm.mergedSegHistory[ss.id] = &mergedSegmentHistory{
workerID: uint64(i),
oldNewDocIDs: newDocIDsSet[i][j],
oldSegment: ss,
}
}
}
select { // send to introducer
case <-s.closeCh:
_ = closeNewMergedSegments(newMergedSegments)
return nil, nil, segment.ErrClosed
case s.merges <- sm:
}
// blockingly wait for the introduction to complete
var newSnapshot *IndexSnapshot
introStatus := <-sm.notifyCh
if introStatus != nil && introStatus.indexSnapshot != nil {
newSnapshot = introStatus.indexSnapshot
atomic.AddUint64(&s.stats.TotMemMergeSegments, uint64(numSegments))
atomic.AddUint64(&s.stats.TotMemMergeDone, 1)
if introStatus.skipped {
// close the segment on skipping introduction.
_ = newSnapshot.DecRef()
_ = closeNewMergedSegments(newMergedSegments)
newSnapshot = nil
}
}
return newSnapshot, newMergedSegmentIDs, nil
}
func (s *Scorch) ReportBytesWritten(bytesWritten uint64) {
atomic.AddUint64(&s.stats.TotFileMergeWrittenBytes, bytesWritten)
}