Fix the unset partID (#799)

diff --git a/banyand/internal/sidx/block_reader_test.go b/banyand/internal/sidx/block_reader_test.go
index 83660e6..06ef81b 100644
--- a/banyand/internal/sidx/block_reader_test.go
+++ b/banyand/internal/sidx/block_reader_test.go
@@ -206,7 +206,7 @@
 					mpp = append(mpp, mp)
 					mp.mustInitFromElements(elems)
 					mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i)))
-					filePW := newPartWrapper(nil, mustOpenPart(partPath(tmpPath, uint64(i)), fileSystem))
+					filePW := newPartWrapper(nil, mustOpenPart(uint64(i), partPath(tmpPath, uint64(i)), fileSystem))
 					filePW.p.partMetadata.ID = uint64(i)
 					fpp = append(fpp, filePW)
 					pp = append(pp, filePW.p)
diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go
index 411c9e2..f514886 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -52,9 +52,9 @@
 	// Flush flushes the SIDX instance to disk.
 	Flush() error
 	// Merge merges the specified parts into a new part.
-	Merge(closeCh <-chan struct{}) error
+	Merge(closeCh <-chan struct{}) (uint64, error)
 	// MergeMemPart merges the mem parts into a new part.
-	MergeMemParts(closeCh <-chan struct{}) error
+	MergeMemParts(closeCh <-chan struct{}) (uint64, error)
 	// PartsToSync returns the parts to sync.
 	PartsToSync() []*part
 	// StreamingParts returns the streaming parts.
diff --git a/banyand/internal/sidx/iter_test.go b/banyand/internal/sidx/iter_test.go
index 53f34e6..24d6123 100644
--- a/banyand/internal/sidx/iter_test.go
+++ b/banyand/internal/sidx/iter_test.go
@@ -233,7 +233,7 @@
 						if partType == "file_based" {
 							partDir := filepath.Join(tempDir, fmt.Sprintf("%s_%s_part%d", partType, tc.name, i))
 							mp.mustFlush(testFS, partDir)
-							testPart = mustOpenPart(partDir, testFS)
+							testPart = mustOpenPart(uint64(i), partDir, testFS)
 						} else {
 							testPart = openMemPart(mp)
 						}
@@ -278,7 +278,7 @@
 
 		partDir := filepath.Join(tempDir, "empty_series")
 		mp.mustFlush(testFS, partDir)
-		testPart := mustOpenPart(partDir, testFS)
+		testPart := mustOpenPart(1, partDir, testFS)
 		defer testPart.close()
 
 		bma := generateBlockMetadataArray()
@@ -317,9 +317,9 @@
 		mp1.mustFlush(testFS, partDir1)
 		mp2.mustFlush(testFS, partDir2)
 
-		testPart1 := mustOpenPart(partDir1, testFS)
+		testPart1 := mustOpenPart(1, partDir1, testFS)
 		defer testPart1.close()
-		testPart2 := mustOpenPart(partDir2, testFS)
+		testPart2 := mustOpenPart(2, partDir2, testFS)
 		defer testPart2.close()
 
 		bma := generateBlockMetadataArray()
@@ -346,7 +346,7 @@
 
 		partDir := filepath.Join(tempDir, "single_part")
 		mp.mustFlush(testFS, partDir)
-		testPart := mustOpenPart(partDir, testFS)
+		testPart := mustOpenPart(1, partDir, testFS)
 		defer testPart.close()
 
 		bma := generateBlockMetadataArray()
@@ -427,9 +427,9 @@
 		mp1.mustFlush(testFS, partDir1)
 		mp2.mustFlush(testFS, partDir2)
 
-		testPart1 := mustOpenPart(partDir1, testFS)
+		testPart1 := mustOpenPart(1, partDir1, testFS)
 		defer testPart1.close()
-		testPart2 := mustOpenPart(partDir2, testFS)
+		testPart2 := mustOpenPart(2, partDir2, testFS)
 		defer testPart2.close()
 
 		bma := generateBlockMetadataArray()
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index 9acbdcd..e384bd9 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -31,11 +31,11 @@
 )
 
 // Merge implements Merger interface.
-func (s *sidx) Merge(closeCh <-chan struct{}) error {
+func (s *sidx) Merge(closeCh <-chan struct{}) (uint64, error) {
 	// Get current snapshot
 	snap := s.currentSnapshot()
 	if snap == nil {
-		return nil
+		return 0, nil
 	}
 	defer snap.decRef()
 
@@ -53,7 +53,7 @@
 	}
 
 	if len(partsToMerge) < 2 {
-		return nil
+		return 0, nil
 	}
 
 	// Mark parts for merging
@@ -67,7 +67,7 @@
 	// Create new merged part
 	newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root)
 	if err != nil {
-		return err
+		return 0, err
 	}
 	mergeIntro.newPart = newPart
 
@@ -77,13 +77,13 @@
 	// Wait for merge to complete
 	<-mergeIntro.applied
 
-	return nil
+	return uint64(len(partsToMerge)), nil
 }
 
-func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error {
+func (s *sidx) MergeMemParts(closeCh <-chan struct{}) (uint64, error) {
 	snap := s.currentSnapshot()
 	if snap == nil {
-		return nil
+		return 0, nil
 	}
 	defer snap.decRef()
 
@@ -92,7 +92,7 @@
 	defer releaseMergerIntroduction(mergeIntro)
 	mergeIntro.applied = make(chan struct{})
 
-	// Select parts to merge (all active non-memory parts)
+	// Select parts to merge (all active memory parts)
 	var partsToMerge []*partWrapper
 	for _, pw := range snap.parts {
 		if pw.isActive() && pw.isMemPart() {
@@ -101,7 +101,7 @@
 	}
 
 	if len(partsToMerge) < 2 {
-		return nil
+		return 0, nil
 	}
 
 	// Mark parts for merging
@@ -115,7 +115,7 @@
 	// Create new merged part
 	newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root)
 	if err != nil {
-		return err
+		return 0, err
 	}
 	mergeIntro.newPart = newPart
 
@@ -125,7 +125,7 @@
 	// Wait for merge to complete
 	<-mergeIntro.applied
 
-	return nil
+	return uint64(len(partsToMerge)), nil
 }
 
 func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, parts []*partWrapper, partID uint64, root string) (*partWrapper, error) {
@@ -158,7 +158,7 @@
 	}
 	pm.mustWriteMetadata(fileSystem, dstPath)
 	fileSystem.SyncPath(dstPath)
-	p := mustOpenPart(dstPath, fileSystem)
+	p := mustOpenPart(partID, dstPath, fileSystem)
 
 	return newPartWrapper(nil, p), nil
 }
diff --git a/banyand/internal/sidx/merge_test.go b/banyand/internal/sidx/merge_test.go
index bf6eba9..3bdaa78 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -418,9 +418,8 @@
 					mp.mustInitFromElements(es)
 					partPath := filepath.Join(tmpPath, "part_"+string(rune('0'+i)))
 					mp.mustFlush(fileSystem, partPath)
-					filePart := mustOpenPart(partPath, fileSystem)
+					filePart := mustOpenPart(uint64(i), partPath, fileSystem)
 					filePW := newPartWrapper(nil, filePart)
-					filePW.p.partMetadata.ID = uint64(i)
 					fpp = append(fpp, filePW)
 					ReleaseMemPart(mp)
 				}
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go
index 3249a50..853e2c1 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -62,12 +62,12 @@
 	return nil
 }
 
-func (m *mockSIDX) Merge(_ <-chan struct{}) error {
-	return nil
+func (m *mockSIDX) Merge(_ <-chan struct{}) (uint64, error) {
+	return 0, nil
 }
 
-func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) error {
-	return nil
+func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) (uint64, error) {
+	return 0, nil
 }
 
 func (m *mockSIDX) PartsToSync() []*part {
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 5b02efb..3413812 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -104,7 +104,7 @@
 // mustOpenPart opens a part from the specified path using the given file system.
 // It opens all standard files and discovers tag files automatically.
 // Panics if any required file cannot be opened.
-func mustOpenPart(path string, fileSystem fs.FileSystem) *part {
+func mustOpenPart(partID uint64, path string, fileSystem fs.FileSystem) *part {
 	p := &part{
 		path:       path,
 		fileSystem: fileSystem,
@@ -121,6 +121,7 @@
 		p.close()
 		logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load part metadata")
 	}
+	p.partMetadata.ID = partID
 
 	// Load primary block metadata from primary.bin.
 	p.loadPrimaryBlockMetadata()
diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go
index eb67ab1..a64fdde 100644
--- a/banyand/internal/sidx/part_iter_test.go
+++ b/banyand/internal/sidx/part_iter_test.go
@@ -314,7 +314,7 @@
 				partDir := filepath.Join(tempDir, fmt.Sprintf("part_%s", tt.name))
 				mp.mustFlush(testFS, partDir)
 
-				part := mustOpenPart(partDir, testFS)
+				part := mustOpenPart(1, partDir, testFS)
 				defer part.close()
 
 				// Run the test case
@@ -376,7 +376,7 @@
 		partDir := filepath.Join(tempDir, "empty_series_test")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Test with empty series list
@@ -417,7 +417,7 @@
 		partDir := filepath.Join(tempDir, "no_match_key_range")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Test with non-overlapping key range
@@ -458,7 +458,7 @@
 		partDir := filepath.Join(tempDir, "no_match_series")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Test with different series ID
@@ -504,7 +504,7 @@
 		partDir := filepath.Join(tempDir, "nil_filter")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Test with nil blockFilter
@@ -546,7 +546,7 @@
 		partDir := filepath.Join(tempDir, "allow_all_filter")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Create a mock filter that allows all blocks
@@ -591,7 +591,7 @@
 		partDir := filepath.Join(tempDir, "skip_all_filter")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Create a mock filter that skips all blocks
@@ -636,7 +636,7 @@
 		partDir := filepath.Join(tempDir, "error_filter")
 		mp.mustFlush(testFS, partDir)
 
-		part := mustOpenPart(partDir, testFS)
+		part := mustOpenPart(1, partDir, testFS)
 		defer part.close()
 
 		// Create a mock filter that returns an error
diff --git a/banyand/internal/sidx/part_test.go b/banyand/internal/sidx/part_test.go
index 728a92c..d89f8e8 100644
--- a/banyand/internal/sidx/part_test.go
+++ b/banyand/internal/sidx/part_test.go
@@ -139,7 +139,7 @@
 		require.NoError(t, err)
 	}
 
-	part := mustOpenPart(tempDir, testFS)
+	part := mustOpenPart(pm.ID, tempDir, testFS)
 
 	expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir)
 	assert.Equal(t, expectedString, part.String())
@@ -390,7 +390,7 @@
 			mp.mustFlush(testFS, partDir)
 
 			// Step 3: Open the flushed part from disk
-			part := mustOpenPart(partDir, testFS)
+			part := mustOpenPart(1, partDir, testFS)
 			defer part.close()
 
 			// Step 4: Read all elements back from part
diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go
index cd589f9..42244f8 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -168,11 +168,7 @@
 }
 
 // ID returns the unique identifier of the part.
-// Returns 0 if the part is nil.
 func (pw *partWrapper) ID() uint64 {
-	if pw.p == nil || pw.p.partMetadata == nil {
-		return 0
-	}
 	return pw.p.partMetadata.ID
 }
 
@@ -218,12 +214,32 @@
 	}
 
 	if pw.mp != nil {
+		var id uint64
+		if pw.mp.partMetadata != nil {
+			id = pw.mp.partMetadata.ID
+		}
 		return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, memPart=true}",
-			pw.ID(), state, refCount)
+			id, state, refCount)
+	}
+
+	if pw.p == nil {
+		return fmt.Sprintf("partWrapper{id=nil, state=%s, ref=%d, part=nil}", state, refCount)
+	}
+
+	// Handle case where p.partMetadata might be nil after cleanup
+	var id uint64
+	var path string
+	if pw.p.partMetadata != nil {
+		id = pw.p.partMetadata.ID
+	}
+	if pw.p.path != "" {
+		path = pw.p.path
+	} else {
+		path = "unknown"
 	}
 
 	return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}",
-		pw.ID(), state, refCount, pw.p.path)
+		id, state, refCount, path)
 }
 
 // overlapsKeyRange checks if the part overlaps with the given key range.
diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go
index 4ca6af4..9a1a097 100644
--- a/banyand/internal/sidx/part_wrapper_test.go
+++ b/banyand/internal/sidx/part_wrapper_test.go
@@ -229,7 +229,19 @@
 
 	pw := newPartWrapper(nil, p)
 
-	// Release once (should reach 0)
+	// Test multiple releases before cleanup
+	pw.acquire() // ref count = 2
+	pw.acquire() // ref count = 3
+
+	pw.release() // ref count = 2
+	assert.Equal(t, int32(2), pw.refCount())
+	assert.True(t, pw.isActive())
+
+	pw.release() // ref count = 1
+	assert.Equal(t, int32(1), pw.refCount())
+	assert.True(t, pw.isActive())
+
+	// Final release should trigger cleanup and set ref to 0
 	pw.release()
 	assert.Equal(t, int32(0), pw.refCount())
 	assert.True(t, pw.isRemoved())
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 64777d6..2436b37 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -459,8 +459,7 @@
 				Str("part_path", partPath).
 				Msg("flushing sidx part")
 		}
-		newPW := newPartWrapper(nil, mustOpenPart(partPath, s.fileSystem))
-		newPW.p.partMetadata.ID = pw.ID()
+		newPW := newPartWrapper(nil, mustOpenPart(pw.ID(), partPath, s.fileSystem))
 		flushIntro.flushed[newPW.ID()] = newPW
 	}
 
@@ -868,8 +867,7 @@
 			continue
 		}
 		partPath := partPath(s.root, id)
-		part := mustOpenPart(partPath, s.fileSystem)
-		part.partMetadata.ID = id
+		part := mustOpenPart(id, partPath, s.fileSystem)
 		pw := newPartWrapper(nil, part)
 		snp.addPart(pw)
 		if s.curPartID < id {
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 76096b4..ce8338a 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -192,7 +192,7 @@
 		// merge memory must not be closed by the tsTable.close
 		closeCh := make(chan struct{})
 		newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-			currentMergedIDs, mergeCh, closeCh, "mem")
+			currentMergedIDs, mergeCh, closeCh, mergeTypeMem)
 		close(closeCh)
 		if err != nil {
 			if errors.Is(err, errClosed) {
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 148aa6f..fe2fb19 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -33,6 +33,11 @@
 
 var mergeMaxConcurrencyCh = make(chan struct{}, cgroups.CPUs())
 
+var (
+	mergeTypeMem  = "mem"
+	mergeTypeFile = "file"
+)
+
 func (tst *tsTable) mergeLoop(merges chan *mergerIntroduction, flusherNotifier watcher.Channel) {
 	defer tst.loopCloser.Done()
 
@@ -95,7 +100,7 @@
 		return nil, nil
 	}
 	if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, dst,
-		toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err != nil {
+		toBeMerged, merges, tst.loopCloser.CloseNotify(), mergeTypeFile); err != nil {
 		return dst, err
 	}
 	return dst, nil
@@ -112,12 +117,6 @@
 	if err != nil {
 		return nil, err
 	}
-	for _, sidxInstance := range tst.getAllSidx() {
-		if err := sidxInstance.MergeMemParts(closeCh); err != nil {
-			tst.l.Warn().Err(err).Msg("sidx merge failed")
-			return nil, err
-		}
-	}
 	elapsed := time.Since(start)
 	tst.incTotalMergeLatency(elapsed.Seconds(), typ)
 	tst.incTotalMerged(1, typ)
@@ -157,6 +156,31 @@
 				Msg("background merger merges unbalanced parts")
 		}
 	}
+	for sidxName, sidxInstance := range tst.getAllSidx() {
+		start = time.Now()
+		var mergedPartsCount uint64
+		var err error
+		if typ == mergeTypeMem {
+			mergedPartsCount, err = sidxInstance.MergeMemParts(closeCh)
+			if err != nil {
+				tst.l.Warn().Err(err).Msg("sidx merge mem parts failed")
+				return nil, err
+			}
+		} else {
+			mergedPartsCount, err = sidxInstance.Merge(closeCh)
+			if err != nil {
+				tst.l.Warn().Err(err).Msg("sidx merge file parts failed")
+				return nil, err
+			}
+		}
+		elapsed = time.Since(start)
+		tst.incTotalMergeLatency(elapsed.Seconds(), fmt.Sprintf("%s_%s", typ, sidxName))
+		tst.incTotalMerged(1, fmt.Sprintf("%s_%s", typ, sidxName))
+		tst.incTotalMergedParts(int(mergedPartsCount), fmt.Sprintf("%s_%s", typ, sidxName))
+		if elapsed > 30*time.Second {
+			tst.l.Warn().Uint64("mergedPartsCount", mergedPartsCount).Str("sidxName", sidxName).Dur("elapsed", elapsed).Msg("sidx merge parts took too long")
+		}
+	}
 
 	mi := generateMergerIntroduction()
 	defer releaseMergerIntroduction(mi)