Process query from liaison (#18)

* add processor

* use custom analyzer

* try add testcase

* fix license

* fix test

* remove fmt.Sprintf usage in the read/write path

* fix processor

* collect chunks from all shards

* fix ci

* finish indexScan with shards

* change IndexSearch API

* remove todo

* refactor mock

* add Write API

* refactor traceIDFetch

* add log in queryProcessor

* stop loop if chunkSet is empty

* get field names from schema

* add TODO

* generate random dir for each test

* include os.RemoveAll in return func

* fix typo
diff --git a/Makefile b/Makefile
index 73e7946..1314666 100644
--- a/Makefile
+++ b/Makefile
@@ -44,6 +44,7 @@
 ##@ Test targets
 
 test: TARGET=test
+test: PROJECTS:=$(PROJECTS) pkg
 test: default          ## Run the unit tests in all projects
 
 test-race: TARGET=test-race
diff --git a/api/event/query.go b/api/event/query.go
new file mode 100644
index 0000000..be842a9
--- /dev/null
+++ b/api/event/query.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package event
+
+import (
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+var (
+	QueryEventKindVersion = common.KindVersion{
+		Version: "v1",
+		Kind:    "event-query",
+	}
+	// TopicQueryEvent is a bidirectional topic for request/response communication
+	// between Liaison and Query module
+	TopicQueryEvent = bus.BiTopic(QueryEventKindVersion.String())
+)
diff --git a/banyand/index/index.go b/banyand/index/index.go
index a21bd85..6ab47f2 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
 package index
 
 import (
@@ -35,8 +34,9 @@
 	Op     apiv1.PairQuery_BinaryOp
 }
 
+//go:generate mockgen -destination=./index_mock.go -package=index . Repo
 type Repo interface {
-	Search(series common.Metadata, shardID uint, startTime, endTime uint64, conditions []Condition) (posting.List, error)
+	Search(seriesMeta common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error)
 }
 
 type Builder interface {
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 2d50601..b024cbe 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -65,7 +65,7 @@
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate trace series")
 	}
-	q, err := query.NewExecutor(ctx, idx, traceSeries)
+	q, err := query.NewExecutor(ctx, repo, idx, traceSeries, traceSeries)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate query executor")
 	}
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
new file mode 100644
index 0000000..2bbb121
--- /dev/null
+++ b/banyand/query/processor.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+	"context"
+	"time"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/event"
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	apischema "github.com/apache/skywalking-banyandb/api/schema"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/series"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/query/executor"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+const (
+	moduleName = "query-processor"
+)
+
+var (
+	_ Executor                  = (*queryProcessor)(nil)
+	_ bus.MessageListener       = (*queryProcessor)(nil)
+	_ executor.ExecutionContext = (*queryProcessor)(nil)
+)
+
+type queryProcessor struct {
+	index.Repo
+	series.UniModel
+	logger      *logger.Logger
+	schemaRepo  series.SchemaRepo
+	log         *logger.Logger
+	serviceRepo discovery.ServiceRepo
+}
+
+func (q *queryProcessor) Rev(message bus.Message) (resp bus.Message) {
+	queryCriteria, ok := message.Data().(*v1.QueryRequest)
+	if !ok {
+		q.log.Warn().Msg("invalid event data type")
+		return
+	}
+	q.log.Info().
+		Msg("received a query event")
+	analyzer := logical.DefaultAnalyzer()
+	metadata := &common.Metadata{
+		KindVersion: apischema.SeriesKindVersion,
+		Spec:        queryCriteria.GetMetadata(),
+	}
+	s, err := analyzer.BuildTraceSchema(context.TODO(), *metadata)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to build trace schema")
+		return
+	}
+
+	p, err := analyzer.Analyze(context.TODO(), queryCriteria, metadata, s)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to analyze the query request")
+		return
+	}
+
+	entities, err := p.Execute(q)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to execute the query plan")
+		return
+	}
+
+	now := time.Now().UnixNano()
+	resp = bus.NewMessage(bus.MessageID(now), entities)
+
+	return
+}
+
+func (q *queryProcessor) Name() string {
+	return moduleName
+}
+
+func (q *queryProcessor) PreRun() error {
+	q.log = logger.GetLogger(moduleName)
+	return q.serviceRepo.Subscribe(event.TopicQueryEvent, q)
+}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
new file mode 100644
index 0000000..7ebe12f
--- /dev/null
+++ b/banyand/query/processor_test.go
@@ -0,0 +1,333 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+	"context"
+	"os"
+	"path"
+	"testing"
+	"time"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/stretchr/testify/require"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/event"
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/series"
+	"github.com/apache/skywalking-banyandb/banyand/series/trace"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var interval = time.Millisecond * 500
+
+type entityValue struct {
+	seriesID   string
+	entityID   string
+	dataBinary []byte
+	ts         time.Time
+	items      []interface{}
+}
+
+func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
+	// Bootstrap logger system
+	tester.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "warn",
+	}))
+
+	// Init `Discovery` module
+	repo, err := discovery.NewServiceRepo(context.Background())
+	tester.NoError(err)
+	tester.NotNil(repo)
+
+	// Init `Database` module
+	db, err := storage.NewDB(context.TODO(), repo)
+	tester.NoError(err)
+	uuid, err := googleUUID.NewUUID()
+	tester.NoError(err)
+	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+
+	// Init `Trace` module
+	traceSvc, err := trace.NewService(context.TODO(), db, repo)
+	tester.NoError(err)
+
+	// Init `Query` module
+	executor, err := NewExecutor(context.TODO(), repo, nil, traceSvc, traceSvc)
+	tester.NoError(err)
+
+	// :PreRun:
+	// 1) TraceSeries,
+	// 2) Database
+	err = traceSvc.PreRun()
+	tester.NoError(err)
+
+	err = db.PreRun()
+	tester.NoError(err)
+
+	err = executor.PreRun()
+	tester.NoError(err)
+
+	return repo, traceSvc, func() {
+		db.GracefulStop()
+		_ = os.RemoveAll(rootPath)
+	}
+}
+
+func setupData(tester *require.Assertions, baseTs time.Time, svc series.Service) {
+	metadata := common.Metadata{
+		Spec: &v1.Metadata{
+			Name:  "sw",
+			Group: "default",
+		},
+	}
+
+	entityValues := []entityValue{
+		{
+			ts:         baseTs,
+			seriesID:   "webapp_10.0.0.1",
+			entityID:   "1",
+			dataBinary: []byte{11},
+			items: []interface{}{
+				"trace_id-xxfff.111323",
+				0,
+				"webapp_id",
+				"10.0.0.1_id",
+				"/home_id",
+				"webapp",
+				"10.0.0.1",
+				"/home",
+				300,
+				1622933202000000000,
+			},
+		},
+		{
+			ts:         baseTs.Add(interval),
+			seriesID:   "gateway_10.0.0.2",
+			entityID:   "2",
+			dataBinary: []byte{12},
+			items: []interface{}{
+				"trace_id-xxfff.111323a",
+				1,
+			},
+		},
+		{
+			ts:         baseTs.Add(interval * 2),
+			seriesID:   "httpserver_10.0.0.3",
+			entityID:   "3",
+			dataBinary: []byte{13},
+			items: []interface{}{
+				"trace_id-xxfff.111323",
+				1,
+				"httpserver_id",
+				"10.0.0.3_id",
+				"/home_id",
+				"httpserver",
+				"10.0.0.3",
+				"/home",
+				300,
+				1622933202000000000,
+				"GET",
+				"200",
+			},
+		},
+		{
+			ts:         baseTs.Add(interval * 3),
+			seriesID:   "database_10.0.0.4",
+			entityID:   "4",
+			dataBinary: []byte{14},
+			items: []interface{}{
+				"trace_id-xxfff.111323",
+				0,
+				"database_id",
+				"10.0.0.4_id",
+				"/home_id",
+				"database",
+				"10.0.0.4",
+				"/home",
+				300,
+				1622933202000000000,
+				nil,
+				nil,
+				"MySQL",
+				"10.1.1.4",
+			},
+		},
+		{
+			ts:         baseTs.Add(interval * 4),
+			seriesID:   "mq_10.0.0.5",
+			entityID:   "5",
+			dataBinary: []byte{15},
+			items: []interface{}{
+				"trace_id-zzpp.111323",
+				0,
+				"mq_id",
+				"10.0.0.5_id",
+				"/home_id",
+				"mq",
+				"10.0.0.5",
+				"/home",
+				300,
+				1622933202000000000,
+				nil,
+				nil,
+				nil,
+				nil,
+				"test_topic",
+				"10.0.0.5",
+			},
+		},
+		{
+			ts:         baseTs.Add(interval * 5),
+			seriesID:   "database_10.0.0.6",
+			entityID:   "6",
+			dataBinary: []byte{16},
+			items: []interface{}{
+				"trace_id-zzpp.111323",
+				1,
+				"database_id",
+				"10.0.0.6_id",
+				"/home_id",
+				"database",
+				"10.0.0.6",
+				"/home",
+				300,
+				1622933202000000000,
+				nil,
+				nil,
+				"MySQL",
+				"10.1.1.6",
+			},
+		},
+		{
+			ts:         baseTs.Add(interval * 6),
+			seriesID:   "mq_10.0.0.7",
+			entityID:   "7",
+			dataBinary: []byte{17},
+			items: []interface{}{
+				"trace_id-zzpp.111323",
+				0,
+				"nq_id",
+				"10.0.0.7_id",
+				"/home_id",
+				"mq",
+				"10.0.0.7",
+				"/home",
+				300,
+				1622933202000000000,
+				nil,
+				nil,
+				nil,
+				nil,
+				"test_topic",
+				"10.0.0.7",
+			},
+		},
+	}
+
+	for _, ev := range entityValues {
+		ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
+		tester.True(ok)
+		tester.NoError(err)
+	}
+}
+
+func TestQueryProcessor(t *testing.T) {
+	tester := require.New(t)
+
+	// setup services
+	repo, traceSvc, gracefulStop := setupServices(tester)
+	defer gracefulStop()
+
+	baseTs := time.Now()
+	setupData(tester, baseTs, traceSvc)
+
+	tests := []struct {
+		// name of the test case
+		name string
+		// queryGenerator is used to generate a Query
+		queryGenerator func(baseTs time.Time) *v1.QueryRequest
+		// wantLen is the length of entities expected to return
+		wantLen int
+	}{
+		{
+			name: "query given timeRange is out of the time range of data",
+			queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					TimeRange(time.Unix(0, 0), time.Unix(0, 1)).
+					Projection("trace_id").
+					Build()
+			},
+			wantLen: 0,
+		},
+		{
+			name: "query TraceID given timeRange includes the time range of data",
+			queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					Fields("trace_id", "=", "trace_id-zzpp.111323").
+					TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
+					Projection("trace_id").
+					Build()
+			},
+			wantLen: 3,
+		},
+		{
+			name: "query TraceID given timeRange includes the time range of data but limit to 1",
+			queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(1).
+					Offset(0).
+					Metadata("default", "sw").
+					Fields("trace_id", "=", "trace_id-zzpp.111323").
+					TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
+					Projection("trace_id").
+					Build()
+			},
+			wantLen: 1,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			singleTester := require.New(t)
+			now := time.Now()
+			m := bus.NewMessage(bus.MessageID(now.UnixNano()), tt.queryGenerator(baseTs))
+			f, err := repo.Publish(event.TopicQueryEvent, m)
+			singleTester.NoError(err)
+			singleTester.NotNil(f)
+			msg, err := f.Get()
+			singleTester.NoError(err)
+			singleTester.NotNil(msg)
+			// TODO: better error response
+			singleTester.NotNil(msg.Data())
+			singleTester.Len(msg.Data(), tt.wantLen)
+		})
+	}
+}
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 56d0b0f..3c3f9a8 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -20,8 +20,10 @@
 import (
 	"context"
 
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/series"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -29,6 +31,12 @@
 	run.PreRunner
 }
 
-func NewExecutor(ctx context.Context, idx index.Repo, s series.UniModel) (Executor, error) {
-	return nil, nil
+func NewExecutor(_ context.Context, serviceRepo discovery.ServiceRepo, indexRepo index.Repo, uniModel series.UniModel, schemaRepo series.SchemaRepo) (Executor, error) {
+	return &queryProcessor{
+		Repo:        indexRepo,
+		UniModel:    uniModel,
+		schemaRepo:  schemaRepo,
+		serviceRepo: serviceRepo,
+		logger:      logger.GetLogger("query"),
+	}, nil
 }
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index e2cd6af..1e5f861 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,15 +21,15 @@
 	"context"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type Queue interface {
 	run.Config
 	run.PreRunner
-	bus2.Subscriber
-	bus2.Publisher
+	bus.Subscriber
+	bus.Publisher
 }
 
 func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
diff --git a/banyand/series/series.go b/banyand/series/series.go
index 9dc8263..f10b60e 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -20,6 +20,7 @@
 
 import (
 	"context"
+	"time"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
@@ -54,6 +55,8 @@
 	FetchEntity(traceSeries common.Metadata, shardID uint, chunkIDs posting2.List, opt ScanOptions) ([]data.Entity, error)
 	//ScanEntity returns data.Entity between a duration by ScanOptions
 	ScanEntity(traceSeries common.Metadata, startTime, endTime uint64, opt ScanOptions) ([]data.Entity, error)
+	// Write entity to the given traceSeries
+	Write(traceSeries common.Metadata, ts time.Time, seriesID, entityID string, dataBinary []byte, items ...interface{}) (bool, error)
 }
 
 //UniModel combines Trace, Metric and Log repositories into a union interface
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 507f380..51f1603 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -19,11 +19,14 @@
 
 import (
 	"context"
+	"os"
+	"path"
 	"sort"
 	"strings"
 	"testing"
 	"time"
 
+	googleUUID "github.com/google/uuid"
 	"github.com/stretchr/testify/assert"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -56,7 +59,10 @@
 	_ = logger.Bootstrap()
 	db, err := storage.NewDB(context.TODO(), nil)
 	assert.NoError(t, err)
-	assert.NoError(t, db.FlagSet().Parse(nil))
+	uuid, err := googleUUID.NewUUID()
+	assert.NoError(t, err)
+	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+	assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
 	svc, err := NewService(context.TODO(), db, nil)
 	assert.NoError(t, err)
 	assert.NoError(t, svc.PreRun())
@@ -69,7 +75,10 @@
 		},
 	})
 	assert.NoError(t, err)
-	return ts, db.GracefulStop
+	return ts, func() {
+		db.GracefulStop()
+		_ = os.RemoveAll(rootPath)
+	}
 }
 
 type seriesEntity struct {
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index fd83660..2551813 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -19,7 +19,6 @@
 
 import (
 	"context"
-	"fmt"
 	"strconv"
 	"time"
 
@@ -35,13 +34,14 @@
 	"github.com/apache/skywalking-banyandb/banyand/series/schema"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
 	"github.com/apache/skywalking-banyandb/pkg/pb"
 	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 const (
-	traceSeriesIDTemp = "%s:%s"
 	// KV stores
 	chunkIDMapping = "chunkIDMapping"
 	startTimeIndex = "startTimeIndex"
@@ -111,7 +111,7 @@
 			return errTS
 		}
 		s.db.Register(ts)
-		id := fmt.Sprintf(traceSeriesIDTemp, ts.name, ts.group)
+		id := formatTraceSeriesID(ts.name, ts.group)
 		s.schemaMap[id] = ts
 		s.l.Info().Str("id", id).Msg("initialize Trace series")
 	}
@@ -184,7 +184,7 @@
 }
 
 func (s *service) getSeries(traceSeries common.Metadata) (*traceSeries, error) {
-	id := getTraceSeriesID(traceSeries)
+	id := formatTraceSeriesID(traceSeries.Spec.GetName(), traceSeries.Spec.GetGroup())
 	s.l.Debug().Str("id", id).Msg("got Trace series")
 	ts, ok := s.schemaMap[id]
 	if !ok {
@@ -193,8 +193,30 @@
 	return ts, nil
 }
 
-func getTraceSeriesID(traceSeries common.Metadata) string {
-	return fmt.Sprintf(traceSeriesIDTemp, traceSeries.Spec.GetName(), traceSeries.Spec.GetGroup())
+func (s *service) Write(traceSeriesMetadata common.Metadata, ts time.Time, seriesID, entityID string, dataBinary []byte, items ...interface{}) (bool, error) {
+	traceSeries, err := s.getSeries(traceSeriesMetadata)
+	if err != nil {
+		return false, err
+	}
+
+	ev := pb.NewEntityValueBuilder().
+		DataBinary(dataBinary).
+		EntityID(entityID).
+		Fields(items...).
+		Timestamp(ts).
+		Build()
+
+	seriesIDBytes := []byte(seriesID)
+	shardID := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
+
+	_, err = traceSeries.Write(common.SeriesID(convert.Hash(seriesIDBytes)), shardID, data.EntityValue{
+		EntityValue: ev,
+	})
+	return err == nil, err
+}
+
+func formatTraceSeriesID(name, group string) string {
+	return name + ":" + group
 }
 
 type traceSeries struct {
diff --git a/go.mod b/go.mod
index fe24d89..5ad72a0 100644
--- a/go.mod
+++ b/go.mod
@@ -9,6 +9,7 @@
 	github.com/golang/mock v1.5.0
 	github.com/golang/protobuf v1.5.2
 	github.com/google/go-cmp v0.5.6
+	github.com/google/uuid v1.3.0
 	github.com/klauspost/compress v1.13.1 // indirect
 	github.com/oklog/run v1.1.0
 	github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 42f0ca5..ce8fd5c 100644
--- a/go.sum
+++ b/go.sum
@@ -173,6 +173,8 @@
 github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
diff --git a/pkg/query/logical/analyzer.go b/pkg/query/logical/analyzer.go
index bb9296f..585a746 100644
--- a/pkg/query/logical/analyzer.go
+++ b/pkg/query/logical/analyzer.go
@@ -67,8 +67,9 @@
 	}
 
 	s := &schema{
-		indexRule: indexRule,
-		fieldMap:  make(map[string]*fieldSpec),
+		traceSeries: traceSeries.Spec,
+		indexRule:   indexRule,
+		fieldMap:    make(map[string]*fieldSpec),
 	}
 
 	// generate the schema of the fields for the traceSeries
@@ -94,21 +95,23 @@
 		useIndexScan := false
 		var fieldExprs []Expr
 		traceState := series.TraceStateDefault
+
 		for _, pairQuery := range criteria.GetFields() {
 			op := pairQuery.GetOp()
 			typedPair := pairQuery.GetCondition()
 			switch v := typedPair.GetTyped().(type) {
 			case *apiv1.TypedPair_StrPair:
 				// check special field `trace_id`
-				if v.StrPair.GetKey() == "trace_id" {
-					return TraceIDFetch(v.StrPair.GetValues()[0], traceMetadata, s), nil
+				if v.StrPair.GetKey() == s.TraceIDFieldName() {
+					plan = TraceIDFetch(v.StrPair.GetValues()[0], traceMetadata, projStr...)
+					break
 				}
 				useIndexScan = true
 				lit := parseStrLiteral(v.StrPair)
 				fieldExprs = append(fieldExprs, binaryOpFactory[op](NewFieldRef(v.StrPair.GetKey()), lit))
 			case *apiv1.TypedPair_IntPair:
 				// check special field `state`
-				if v.IntPair.GetKey() == "state" {
+				if v.IntPair.GetKey() == s.TraceStateFieldName() {
 					traceState = series.TraceState(v.IntPair.GetValues()[0])
 					continue
 				}
@@ -120,11 +123,14 @@
 			}
 		}
 
-		// first check if we can use index-scan
-		if useIndexScan {
-			plan = IndexScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, fieldExprs, traceState, projStr...)
-		} else {
-			plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, traceState, projStr...)
+		// if plan is already assigned, skip
+		if plan == nil {
+			// first check if we can use index-scan
+			if useIndexScan {
+				plan = IndexScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, fieldExprs, traceState, projStr...)
+			} else {
+				plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, traceState, projStr...)
+			}
 		}
 	} else {
 		plan = TableScan(timeRange.GetBegin().AsTime().UnixNano(), timeRange.GetEnd().AsTime().UnixNano(), traceMetadata, series.TraceStateDefault, projStr...)
diff --git a/pkg/query/logical/analyzer_test.go b/pkg/query/logical/analyzer_test.go
index 485cf84..eabd79c 100644
--- a/pkg/query/logical/analyzer_test.go
+++ b/pkg/query/logical/analyzer_test.go
@@ -139,7 +139,8 @@
 	assert.NoError(err)
 	assert.NotNil(plan)
 
-	correctPlan := logical.TraceIDFetch("123", metadata, schema)
+	correctPlan, err := logical.TraceIDFetch("123", metadata).Analyze(schema)
+	assert.NoError(err)
 	assert.NotNil(correctPlan)
 	cmp.Equal(plan, correctPlan)
 }
diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go
index 236897f..f77bb54 100644
--- a/pkg/query/logical/common_test.go
+++ b/pkg/query/logical/common_test.go
@@ -36,6 +36,7 @@
 	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/pkg/pb"
 	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 	"github.com/apache/skywalking-banyandb/pkg/query/executor"
 	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -150,21 +151,39 @@
 	return ec
 }
 
-//TODO: pass correct shardID
 func (f *mockDataFactory) MockIndexScan(startTime, endTime time.Time, indexMatches ...*indexMatcher) executor.ExecutionContext {
 	ec := executor.NewMockExecutionContext(f.ctrl)
+	usedShards := make(map[uint]posting.List)
+
 	for _, im := range indexMatches {
 		ec.
 			EXPECT().
-			Search(*f.traceMetadata, uint(0), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), im).
+			Search(*f.traceMetadata, gomock.Eq(im.shardID), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), gomock.Any(), im).
 			Return(im.chunkIDs, nil)
+
+		if list, ok := usedShards[im.shardID]; ok {
+			_ = list.Intersect(im.chunkIDs)
+		} else {
+			usedShards[im.shardID] = im.chunkIDs
+		}
+
+		ec.
+			EXPECT().
+			Search(*f.traceMetadata, gomock.Not(gomock.Eq(im.shardID)), uint64(startTime.UnixNano()), uint64(endTime.UnixNano()), gomock.Any(), im).
+			Return(roaring.NewPostingList(), nil)
 	}
-	ec.
-		EXPECT().
-		FetchEntity(*f.traceMetadata, uint(0), gomock.Any(), gomock.Any()).
-		DoAndReturn(func(_ common.Metadata, _ uint, chunkIDs posting.List, _ series.ScanOptions) ([]data.Entity, error) {
-			return GenerateEntities(GeneratorFromArray(chunkIDs)), nil
-		})
+
+	for shardID := uint(0); shardID < uint(f.s.ShardNumber()); shardID++ {
+		if chunkList, ok := usedShards[shardID]; ok && chunkList.Len() > 0 {
+			ec.
+				EXPECT().
+				FetchEntity(*f.traceMetadata, shardID, gomock.Any(), gomock.Any()).
+				DoAndReturn(func(_ common.Metadata, _ uint, chunkIDs posting.List, _ series.ScanOptions) ([]data.Entity, error) {
+					return GenerateEntities(GeneratorFromArray(chunkIDs)), nil
+				})
+		}
+	}
+
 	return ec
 }
 
@@ -193,6 +212,7 @@
 
 type indexMatcher struct {
 	key      string
+	shardID  uint
 	chunkIDs posting.List
 }
 
@@ -211,9 +231,10 @@
 	return fmt.Sprintf("is search for key %s", i.key)
 }
 
-func newIndexMatcher(key string, chunkIDs posting.List) *indexMatcher {
+func newIndexMatcher(key string, shardID uint, chunkIDs posting.List) *indexMatcher {
 	return &indexMatcher{
 		key:      key,
+		shardID:  shardID,
 		chunkIDs: chunkIDs,
 	}
 }
diff --git a/pkg/query/logical/plan_execution_test.go b/pkg/query/logical/plan_execution_test.go
index 199b029..b4fa588 100644
--- a/pkg/query/logical/plan_execution_test.go
+++ b/pkg/query/logical/plan_execution_test.go
@@ -128,7 +128,8 @@
 
 	traceID := "asdf1234"
 
-	p := logical.TraceIDFetch(traceID, m, s)
+	p, err := logical.TraceIDFetch(traceID, m).Analyze(s)
+	assert.NoError(err)
 	assert.NotNil(p)
 	f := newMockDataFactory(ctrl, m, s, 10)
 	entities, err := p.Execute(f.MockTraceIDFetch(traceID))
@@ -156,7 +157,7 @@
 			unresolvedPlan: logical.IndexScan(st.UnixNano(), et.UnixNano(), m, []logical.Expr{
 				logical.Eq(logical.NewFieldRef("http.method"), logical.Str("GET")),
 			}, series.TraceStateDefault),
-			indexMatchers: []*indexMatcher{newIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))},
+			indexMatchers: []*indexMatcher{newIndexMatcher("http.method", 0, roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))},
 			wantLength:    10,
 		},
 		{
@@ -166,8 +167,8 @@
 				logical.Eq(logical.NewFieldRef("service_id"), logical.Str("app")),
 			}, series.TraceStateDefault),
 			indexMatchers: []*indexMatcher{
-				newIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
-				newIndexMatcher("service_id", roaring.NewPostingListWithInitialData(1, 3, 5, 7, 9)),
+				newIndexMatcher("http.method", 0, roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
+				newIndexMatcher("service_id", 0, roaring.NewPostingListWithInitialData(1, 3, 5, 7, 9)),
 			},
 			wantLength: 5,
 		},
@@ -178,8 +179,8 @@
 				logical.Eq(logical.NewFieldRef("service_id"), logical.Str("app")),
 			}, series.TraceStateDefault),
 			indexMatchers: []*indexMatcher{
-				newIndexMatcher("http.method", roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
-				newIndexMatcher("service_id", roaring.NewPostingList()),
+				newIndexMatcher("http.method", 0, roaring.NewPostingListWithInitialData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)),
+				newIndexMatcher("service_id", 0, roaring.NewPostingList()),
 			},
 			wantLength: 0,
 		},
diff --git a/pkg/query/logical/plan_indexscan.go b/pkg/query/logical/plan_indexscan.go
index e50249a..2a1f12f 100644
--- a/pkg/query/logical/plan_indexscan.go
+++ b/pkg/query/logical/plan_indexscan.go
@@ -30,7 +30,7 @@
 	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/pkg/posting"
-	executor2 "github.com/apache/skywalking-banyandb/pkg/query/executor"
+	"github.com/apache/skywalking-banyandb/pkg/query/executor"
 )
 
 var _ UnresolvedPlan = (*unresolvedIndexScan)(nil)
@@ -107,31 +107,54 @@
 	traceState          series.TraceState
 }
 
-func (i *indexScan) Execute(ec executor2.ExecutionContext) ([]data.Entity, error) {
-	var chunkSet posting.List
-	for _, exprs := range i.conditionMap {
-		// TODO: Discuss which metadata should be used!
-		// 1) traceSeries Metadata: indirect mapping
-		// 2) indexRule Metadata: cannot uniquely determine traceSeries
-		// TODO: should pass correct shardID
-		chunks, err := ec.Search(*i.traceMetadata, 0, uint64(i.startTime), uint64(i.endTime), convertToConditions(exprs))
+func (i *indexScan) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
+	dataEntities := make([]data.Entity, 0)
+
+	// iterate over shards
+	// TODO: we have to push down Limit and Offset to set a threshold to IndexSearch operations
+	for shardID := uint32(0); shardID < i.schema.ShardNumber(); shardID++ {
+		var chunkSet posting.List
+
+		// first collect all chunkIDs via indexes
+		for idxObj, exprs := range i.conditionMap {
+			// 1) traceSeries Metadata -> IndexObject
+			// 2) IndexObject -> Fields
+			chunks, err := ec.Search(*i.traceMetadata, uint(shardID), uint64(i.startTime), uint64(i.endTime), idxObj.GetName(), convertToConditions(exprs))
+			if err != nil {
+				return nil, err
+			}
+			if chunkSet == nil {
+				// chunkSet is nil before the first assignment
+				chunkSet = chunks
+			} else {
+				// afterwards, it must not be nil
+				_ = chunkSet.Intersect(chunks)
+				// stop loop if chunkSet is empty
+				if chunkSet.Len() == 0 {
+					continue
+				}
+			}
+		}
+
+		if chunkSet == nil || chunkSet.Len() == 0 {
+			continue
+		}
+
+		// fetch entities with chunkIDs
+		entitiesFromSingleShard, err := ec.FetchEntity(*i.traceMetadata, uint(shardID), chunkSet, series.ScanOptions{
+			Projection: i.projectionFields,
+			State:      i.traceState,
+		})
+
 		if err != nil {
 			return nil, err
 		}
-		if chunkSet == nil {
-			// chunkSet is nil before the first assignment
-			chunkSet = chunks
-		} else {
-			// afterwards, it must not be nil
-			_ = chunkSet.Intersect(chunks)
-		}
+
+		// merge results
+		dataEntities = append(dataEntities, entitiesFromSingleShard...)
 	}
 
-	//TODO: pass correct shardID
-	return ec.FetchEntity(*i.traceMetadata, 0, chunkSet, series.ScanOptions{
-		Projection: i.projectionFields,
-		State:      i.traceState,
-	})
+	return dataEntities, nil
 }
 
 func (i *indexScan) String() string {
diff --git a/pkg/query/logical/plan_tablescan.go b/pkg/query/logical/plan_tablescan.go
index 0480faa..6adf6b5 100644
--- a/pkg/query/logical/plan_tablescan.go
+++ b/pkg/query/logical/plan_tablescan.go
@@ -47,11 +47,10 @@
 func (u *unresolvedTableScan) Analyze(schema Schema) (Plan, error) {
 	if u.projectionFields == nil || len(u.projectionFields) == 0 {
 		return &tableScan{
-			startTime:           u.startTime,
-			endTime:             u.endTime,
-			projectionFieldRefs: nil,
-			schema:              schema,
-			traceMetadata:       u.traceMetadata,
+			startTime:     u.startTime,
+			endTime:       u.endTime,
+			schema:        schema,
+			traceMetadata: u.traceMetadata,
 		}, nil
 	}
 
diff --git a/pkg/query/logical/plan_traceid.go b/pkg/query/logical/plan_traceid.go
index 4f87a6e..cc4255e 100644
--- a/pkg/query/logical/plan_traceid.go
+++ b/pkg/query/logical/plan_traceid.go
@@ -21,6 +21,7 @@
 	"fmt"
 
 	"github.com/google/go-cmp/cmp"
+	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
@@ -28,20 +29,51 @@
 	"github.com/apache/skywalking-banyandb/pkg/query/executor"
 )
 
+var _ UnresolvedPlan = (*unresolvedTraceIDFetch)(nil)
 var _ Plan = (*traceIDFetch)(nil)
 
-type traceIDFetch struct {
-	metadata *common.Metadata
-	schema   Schema
-	traceID  string
+type unresolvedTraceIDFetch struct {
+	metadata         *common.Metadata
+	traceID          string
+	projectionFields []string
 }
 
-func (t *traceIDFetch) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
-	traceData, err := ec.FetchTrace(*t.metadata, t.traceID, series.ScanOptions{})
+func (t *unresolvedTraceIDFetch) Analyze(s Schema) (Plan, error) {
+	if t.projectionFields == nil || len(t.projectionFields) == 0 {
+		return &traceIDFetch{
+			metadata: t.metadata,
+			schema:   s,
+			traceID:  t.traceID,
+		}, nil
+	}
+
+	if s == nil {
+		return nil, errors.Wrap(ErrInvalidSchema, "nil")
+	}
+
+	fieldRefs, err := s.CreateRef(t.projectionFields...)
 	if err != nil {
 		return nil, err
 	}
-	return traceData.Entities, nil
+	return &traceIDFetch{
+		projectionFields:    t.projectionFields,
+		projectionFieldRefs: fieldRefs,
+		schema:              s,
+		traceID:             t.traceID,
+		metadata:            t.metadata,
+	}, nil
+}
+
+func (t *unresolvedTraceIDFetch) Type() PlanType {
+	return PlanTraceIDFetch
+}
+
+type traceIDFetch struct {
+	metadata            *common.Metadata
+	traceID             string
+	projectionFields    []string
+	projectionFieldRefs []*FieldRef
+	schema              Schema
 }
 
 func (t *traceIDFetch) String() string {
@@ -52,14 +84,14 @@
 	)
 }
 
-func (t *traceIDFetch) Type() PlanType {
-	return PlanTraceIDFetch
-}
-
 func (t *traceIDFetch) Children() []Plan {
 	return []Plan{}
 }
 
+func (t *traceIDFetch) Type() PlanType {
+	return PlanTraceIDFetch
+}
+
 func (t *traceIDFetch) Schema() Schema {
 	return t.schema
 }
@@ -74,10 +106,20 @@
 		cmp.Equal(t.metadata, other.metadata)
 }
 
-func TraceIDFetch(traceID string, metadata *common.Metadata, schema Schema) Plan {
-	return &traceIDFetch{
-		metadata: metadata,
-		schema:   schema,
-		traceID:  traceID,
+func (t *traceIDFetch) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
+	traceData, err := ec.FetchTrace(*t.metadata, t.traceID, series.ScanOptions{
+		Projection: t.projectionFields,
+	})
+	if err != nil {
+		return nil, err
+	}
+	return traceData.Entities, nil
+}
+
+func TraceIDFetch(traceID string, metadata *common.Metadata, projection ...string) UnresolvedPlan {
+	return &unresolvedTraceIDFetch{
+		metadata:         metadata,
+		traceID:          traceID,
+		projectionFields: projection,
 	}
 }
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 8418fb4..00542a5 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -31,6 +31,9 @@
 	CreateRef(names ...string) ([]*FieldRef, error)
 	Map(refs ...*FieldRef) Schema
 	Equal(Schema) bool
+	ShardNumber() uint32
+	TraceIDFieldName() string
+	TraceStateFieldName() string
 }
 
 type fieldSpec struct {
@@ -45,8 +48,17 @@
 var _ Schema = (*schema)(nil)
 
 type schema struct {
-	indexRule apischema.IndexRule
-	fieldMap  map[string]*fieldSpec
+	traceSeries *apiv1.TraceSeries
+	indexRule   apischema.IndexRule
+	fieldMap    map[string]*fieldSpec
+}
+
+func (s *schema) TraceIDFieldName() string {
+	return s.traceSeries.GetReservedFieldsMap().GetTraceId()
+}
+
+func (s *schema) TraceStateFieldName() string {
+	return s.traceSeries.GetReservedFieldsMap().GetState().GetField()
 }
 
 // IndexDefined checks whether the field given is indexed
@@ -100,11 +112,16 @@
 		return nil
 	}
 	newS := &schema{
-		indexRule: s.indexRule,
-		fieldMap:  make(map[string]*fieldSpec),
+		traceSeries: s.traceSeries,
+		indexRule:   s.indexRule,
+		fieldMap:    make(map[string]*fieldSpec),
 	}
 	for _, ref := range refs {
 		newS.fieldMap[ref.name] = ref.spec
 	}
 	return newS
 }
+
+func (s *schema) ShardNumber() uint32 {
+	return s.traceSeries.Shard.Number
+}