Fix the unset partID (#799)
diff --git a/banyand/internal/sidx/block_reader_test.go b/banyand/internal/sidx/block_reader_test.go
index 83660e6..06ef81b 100644
--- a/banyand/internal/sidx/block_reader_test.go
+++ b/banyand/internal/sidx/block_reader_test.go
@@ -206,7 +206,7 @@
mpp = append(mpp, mp)
mp.mustInitFromElements(elems)
mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i)))
- filePW := newPartWrapper(nil, mustOpenPart(partPath(tmpPath, uint64(i)), fileSystem))
+ filePW := newPartWrapper(nil, mustOpenPart(uint64(i), partPath(tmpPath, uint64(i)), fileSystem))
filePW.p.partMetadata.ID = uint64(i)
fpp = append(fpp, filePW)
pp = append(pp, filePW.p)
diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go
index 411c9e2..f514886 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -52,9 +52,9 @@
// Flush flushes the SIDX instance to disk.
Flush() error
// Merge merges the specified parts into a new part.
- Merge(closeCh <-chan struct{}) error
+ Merge(closeCh <-chan struct{}) (uint64, error)
// MergeMemPart merges the mem parts into a new part.
- MergeMemParts(closeCh <-chan struct{}) error
+ MergeMemParts(closeCh <-chan struct{}) (uint64, error)
// PartsToSync returns the parts to sync.
PartsToSync() []*part
// StreamingParts returns the streaming parts.
diff --git a/banyand/internal/sidx/iter_test.go b/banyand/internal/sidx/iter_test.go
index 53f34e6..24d6123 100644
--- a/banyand/internal/sidx/iter_test.go
+++ b/banyand/internal/sidx/iter_test.go
@@ -233,7 +233,7 @@
if partType == "file_based" {
partDir := filepath.Join(tempDir, fmt.Sprintf("%s_%s_part%d", partType, tc.name, i))
mp.mustFlush(testFS, partDir)
- testPart = mustOpenPart(partDir, testFS)
+ testPart = mustOpenPart(uint64(i), partDir, testFS)
} else {
testPart = openMemPart(mp)
}
@@ -278,7 +278,7 @@
partDir := filepath.Join(tempDir, "empty_series")
mp.mustFlush(testFS, partDir)
- testPart := mustOpenPart(partDir, testFS)
+ testPart := mustOpenPart(1, partDir, testFS)
defer testPart.close()
bma := generateBlockMetadataArray()
@@ -317,9 +317,9 @@
mp1.mustFlush(testFS, partDir1)
mp2.mustFlush(testFS, partDir2)
- testPart1 := mustOpenPart(partDir1, testFS)
+ testPart1 := mustOpenPart(1, partDir1, testFS)
defer testPart1.close()
- testPart2 := mustOpenPart(partDir2, testFS)
+ testPart2 := mustOpenPart(2, partDir2, testFS)
defer testPart2.close()
bma := generateBlockMetadataArray()
@@ -346,7 +346,7 @@
partDir := filepath.Join(tempDir, "single_part")
mp.mustFlush(testFS, partDir)
- testPart := mustOpenPart(partDir, testFS)
+ testPart := mustOpenPart(1, partDir, testFS)
defer testPart.close()
bma := generateBlockMetadataArray()
@@ -427,9 +427,9 @@
mp1.mustFlush(testFS, partDir1)
mp2.mustFlush(testFS, partDir2)
- testPart1 := mustOpenPart(partDir1, testFS)
+ testPart1 := mustOpenPart(1, partDir1, testFS)
defer testPart1.close()
- testPart2 := mustOpenPart(partDir2, testFS)
+ testPart2 := mustOpenPart(2, partDir2, testFS)
defer testPart2.close()
bma := generateBlockMetadataArray()
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index 9acbdcd..e384bd9 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -31,11 +31,11 @@
)
// Merge implements Merger interface.
-func (s *sidx) Merge(closeCh <-chan struct{}) error {
+func (s *sidx) Merge(closeCh <-chan struct{}) (uint64, error) {
// Get current snapshot
snap := s.currentSnapshot()
if snap == nil {
- return nil
+ return 0, nil
}
defer snap.decRef()
@@ -53,7 +53,7 @@
}
if len(partsToMerge) < 2 {
- return nil
+ return 0, nil
}
// Mark parts for merging
@@ -67,7 +67,7 @@
// Create new merged part
newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root)
if err != nil {
- return err
+ return 0, err
}
mergeIntro.newPart = newPart
@@ -77,13 +77,13 @@
// Wait for merge to complete
<-mergeIntro.applied
- return nil
+ return uint64(len(partsToMerge)), nil
}
-func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error {
+func (s *sidx) MergeMemParts(closeCh <-chan struct{}) (uint64, error) {
snap := s.currentSnapshot()
if snap == nil {
- return nil
+ return 0, nil
}
defer snap.decRef()
@@ -92,7 +92,7 @@
defer releaseMergerIntroduction(mergeIntro)
mergeIntro.applied = make(chan struct{})
- // Select parts to merge (all active non-memory parts)
+ // Select parts to merge (all active memory parts)
var partsToMerge []*partWrapper
for _, pw := range snap.parts {
if pw.isActive() && pw.isMemPart() {
@@ -101,7 +101,7 @@
}
if len(partsToMerge) < 2 {
- return nil
+ return 0, nil
}
// Mark parts for merging
@@ -115,7 +115,7 @@
// Create new merged part
newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root)
if err != nil {
- return err
+ return 0, err
}
mergeIntro.newPart = newPart
@@ -125,7 +125,7 @@
// Wait for merge to complete
<-mergeIntro.applied
- return nil
+ return uint64(len(partsToMerge)), nil
}
func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, parts []*partWrapper, partID uint64, root string) (*partWrapper, error) {
@@ -158,7 +158,7 @@
}
pm.mustWriteMetadata(fileSystem, dstPath)
fileSystem.SyncPath(dstPath)
- p := mustOpenPart(dstPath, fileSystem)
+ p := mustOpenPart(partID, dstPath, fileSystem)
return newPartWrapper(nil, p), nil
}
diff --git a/banyand/internal/sidx/merge_test.go b/banyand/internal/sidx/merge_test.go
index bf6eba9..3bdaa78 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -418,9 +418,8 @@
mp.mustInitFromElements(es)
partPath := filepath.Join(tmpPath, "part_"+string(rune('0'+i)))
mp.mustFlush(fileSystem, partPath)
- filePart := mustOpenPart(partPath, fileSystem)
+ filePart := mustOpenPart(uint64(i), partPath, fileSystem)
filePW := newPartWrapper(nil, filePart)
- filePW.p.partMetadata.ID = uint64(i)
fpp = append(fpp, filePW)
ReleaseMemPart(mp)
}
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go
index 3249a50..853e2c1 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -62,12 +62,12 @@
return nil
}
-func (m *mockSIDX) Merge(_ <-chan struct{}) error {
- return nil
+func (m *mockSIDX) Merge(_ <-chan struct{}) (uint64, error) {
+ return 0, nil
}
-func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) error {
- return nil
+func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) (uint64, error) {
+ return 0, nil
}
func (m *mockSIDX) PartsToSync() []*part {
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 5b02efb..3413812 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -104,7 +104,7 @@
// mustOpenPart opens a part from the specified path using the given file system.
// It opens all standard files and discovers tag files automatically.
// Panics if any required file cannot be opened.
-func mustOpenPart(path string, fileSystem fs.FileSystem) *part {
+func mustOpenPart(partID uint64, path string, fileSystem fs.FileSystem) *part {
p := &part{
path: path,
fileSystem: fileSystem,
@@ -121,6 +121,7 @@
p.close()
logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load part metadata")
}
+ p.partMetadata.ID = partID
// Load primary block metadata from primary.bin.
p.loadPrimaryBlockMetadata()
diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go
index eb67ab1..a64fdde 100644
--- a/banyand/internal/sidx/part_iter_test.go
+++ b/banyand/internal/sidx/part_iter_test.go
@@ -314,7 +314,7 @@
partDir := filepath.Join(tempDir, fmt.Sprintf("part_%s", tt.name))
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Run the test case
@@ -376,7 +376,7 @@
partDir := filepath.Join(tempDir, "empty_series_test")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with empty series list
@@ -417,7 +417,7 @@
partDir := filepath.Join(tempDir, "no_match_key_range")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with non-overlapping key range
@@ -458,7 +458,7 @@
partDir := filepath.Join(tempDir, "no_match_series")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with different series ID
@@ -504,7 +504,7 @@
partDir := filepath.Join(tempDir, "nil_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Test with nil blockFilter
@@ -546,7 +546,7 @@
partDir := filepath.Join(tempDir, "allow_all_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Create a mock filter that allows all blocks
@@ -591,7 +591,7 @@
partDir := filepath.Join(tempDir, "skip_all_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Create a mock filter that skips all blocks
@@ -636,7 +636,7 @@
partDir := filepath.Join(tempDir, "error_filter")
mp.mustFlush(testFS, partDir)
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Create a mock filter that returns an error
diff --git a/banyand/internal/sidx/part_test.go b/banyand/internal/sidx/part_test.go
index 728a92c..d89f8e8 100644
--- a/banyand/internal/sidx/part_test.go
+++ b/banyand/internal/sidx/part_test.go
@@ -139,7 +139,7 @@
require.NoError(t, err)
}
- part := mustOpenPart(tempDir, testFS)
+ part := mustOpenPart(pm.ID, tempDir, testFS)
expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir)
assert.Equal(t, expectedString, part.String())
@@ -390,7 +390,7 @@
mp.mustFlush(testFS, partDir)
// Step 3: Open the flushed part from disk
- part := mustOpenPart(partDir, testFS)
+ part := mustOpenPart(1, partDir, testFS)
defer part.close()
// Step 4: Read all elements back from part
diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go
index cd589f9..42244f8 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -168,11 +168,7 @@
}
// ID returns the unique identifier of the part.
-// Returns 0 if the part is nil.
func (pw *partWrapper) ID() uint64 {
- if pw.p == nil || pw.p.partMetadata == nil {
- return 0
- }
return pw.p.partMetadata.ID
}
@@ -218,12 +214,32 @@
}
if pw.mp != nil {
+ var id uint64
+ if pw.mp.partMetadata != nil {
+ id = pw.mp.partMetadata.ID
+ }
return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, memPart=true}",
- pw.ID(), state, refCount)
+ id, state, refCount)
+ }
+
+ if pw.p == nil {
+ return fmt.Sprintf("partWrapper{id=nil, state=%s, ref=%d, part=nil}", state, refCount)
+ }
+
+ // Handle case where p.partMetadata might be nil after cleanup
+ var id uint64
+ var path string
+ if pw.p.partMetadata != nil {
+ id = pw.p.partMetadata.ID
+ }
+ if pw.p.path != "" {
+ path = pw.p.path
+ } else {
+ path = "unknown"
}
return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}",
- pw.ID(), state, refCount, pw.p.path)
+ id, state, refCount, path)
}
// overlapsKeyRange checks if the part overlaps with the given key range.
diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go
index 4ca6af4..9a1a097 100644
--- a/banyand/internal/sidx/part_wrapper_test.go
+++ b/banyand/internal/sidx/part_wrapper_test.go
@@ -229,7 +229,19 @@
pw := newPartWrapper(nil, p)
- // Release once (should reach 0)
+ // Test multiple releases before cleanup
+ pw.acquire() // ref count = 2
+ pw.acquire() // ref count = 3
+
+ pw.release() // ref count = 2
+ assert.Equal(t, int32(2), pw.refCount())
+ assert.True(t, pw.isActive())
+
+ pw.release() // ref count = 1
+ assert.Equal(t, int32(1), pw.refCount())
+ assert.True(t, pw.isActive())
+
+ // Final release should trigger cleanup and set ref to 0
pw.release()
assert.Equal(t, int32(0), pw.refCount())
assert.True(t, pw.isRemoved())
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 64777d6..2436b37 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -459,8 +459,7 @@
Str("part_path", partPath).
Msg("flushing sidx part")
}
- newPW := newPartWrapper(nil, mustOpenPart(partPath, s.fileSystem))
- newPW.p.partMetadata.ID = pw.ID()
+ newPW := newPartWrapper(nil, mustOpenPart(pw.ID(), partPath, s.fileSystem))
flushIntro.flushed[newPW.ID()] = newPW
}
@@ -868,8 +867,7 @@
continue
}
partPath := partPath(s.root, id)
- part := mustOpenPart(partPath, s.fileSystem)
- part.partMetadata.ID = id
+ part := mustOpenPart(id, partPath, s.fileSystem)
pw := newPartWrapper(nil, part)
snp.addPart(pw)
if s.curPartID < id {
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 76096b4..ce8338a 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -192,7 +192,7 @@
// merge memory must not be closed by the tsTable.close
closeCh := make(chan struct{})
newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
- currentMergedIDs, mergeCh, closeCh, "mem")
+ currentMergedIDs, mergeCh, closeCh, mergeTypeMem)
close(closeCh)
if err != nil {
if errors.Is(err, errClosed) {
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 148aa6f..fe2fb19 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -33,6 +33,11 @@
var mergeMaxConcurrencyCh = make(chan struct{}, cgroups.CPUs())
+var (
+ mergeTypeMem = "mem"
+ mergeTypeFile = "file"
+)
+
func (tst *tsTable) mergeLoop(merges chan *mergerIntroduction, flusherNotifier watcher.Channel) {
defer tst.loopCloser.Done()
@@ -95,7 +100,7 @@
return nil, nil
}
if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, dst,
- toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err != nil {
+ toBeMerged, merges, tst.loopCloser.CloseNotify(), mergeTypeFile); err != nil {
return dst, err
}
return dst, nil
@@ -112,12 +117,6 @@
if err != nil {
return nil, err
}
- for _, sidxInstance := range tst.getAllSidx() {
- if err := sidxInstance.MergeMemParts(closeCh); err != nil {
- tst.l.Warn().Err(err).Msg("sidx merge failed")
- return nil, err
- }
- }
elapsed := time.Since(start)
tst.incTotalMergeLatency(elapsed.Seconds(), typ)
tst.incTotalMerged(1, typ)
@@ -157,6 +156,31 @@
Msg("background merger merges unbalanced parts")
}
}
+ for sidxName, sidxInstance := range tst.getAllSidx() {
+ start = time.Now()
+ var mergedPartsCount uint64
+ var err error
+ if typ == mergeTypeMem {
+ mergedPartsCount, err = sidxInstance.MergeMemParts(closeCh)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("sidx merge mem parts failed")
+ return nil, err
+ }
+ } else {
+ mergedPartsCount, err = sidxInstance.Merge(closeCh)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("sidx merge file parts failed")
+ return nil, err
+ }
+ }
+ elapsed = time.Since(start)
+ tst.incTotalMergeLatency(elapsed.Seconds(), fmt.Sprintf("%s_%s", typ, sidxName))
+ tst.incTotalMerged(1, fmt.Sprintf("%s_%s", typ, sidxName))
+ tst.incTotalMergedParts(int(mergedPartsCount), fmt.Sprintf("%s_%s", typ, sidxName))
+ if elapsed > 30*time.Second {
+ tst.l.Warn().Uint64("mergedPartsCount", mergedPartsCount).Str("sidxName", sidxName).Dur("elapsed", elapsed).Msg("sidx merge parts took too long")
+ }
+ }
mi := generateMergerIntroduction()
defer releaseMergerIntroduction(mi)