[Optional] Support trace/span on ElasticSearch to use traceIDs routing. (#10282)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 7f915a8..8abd796 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -80,6 +80,7 @@
 * Add Python HBase happybase module component ID(94).
 * Fix gRPC alarm cannot update settings from dynamic configuration source.
 * Add Python Websocket module component ID(7018).
+* [**Breaking Change**] Optimize single trace query performance by customizing routing in ElasticSearch. SkyWalking trace segments and Zipkin spans are using trace ID for routing.
 
 #### UI
 
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index 2dfd67a..5abe9fa 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -107,6 +107,8 @@
     # Set it to `true` could shard metrics indices into multi-physical indices
     # as same as the versions(one index template per metric/meter aggregation function) before 9.2.0.
     logicSharding: ${SW_STORAGE_ES_LOGIC_SHARDING:false}
+    # Custom routing can reduce the impact of searches. Instead of having to fan out a search request to all the shards in an index, the request can be sent to just the shard that matches the specific routing value (or values).
+    enableCustomRouting: ${SW_STORAGE_ES_ENABLE_CUSTOM_ROUTING:false}
 ```
 
 ### ElasticSearch With Https SSL Encrypting communications.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index b5ceed2..6694de9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -32,6 +32,7 @@
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
 import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -67,6 +68,7 @@
     @Getter
     @Column(columnName = TRACE_ID, length = 150)
     @BanyanDB.GlobalIndex
+    @ElasticSearch.Routing
     private String traceId;
     @Setter
     @Getter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
index b592e55..765ab38 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
@@ -23,6 +23,7 @@
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 
 /**
  * ElasticSearch annotation is a holder including all annotations for ElasticSearch storage
@@ -97,4 +98,12 @@
         String columnAlias();
 
     }
+
+    /**
+     * Routing defines a field of {@link Record} to control the sharding policy.
+     */
+    @Target(ElementType.FIELD)
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface Routing {
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
index 2ff5264..8be7062 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
@@ -40,6 +40,8 @@
 
     private final boolean isKeyword;
 
+    private final boolean isRouting;
+
     public boolean needMatchQuery() {
         return analyzer != null;
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchModelExtension.java
new file mode 100644
index 0000000..cdad596
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchModelExtension.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.model;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class ElasticSearchModelExtension {
+
+    /**
+     * Routing defines a field of {@link Record} to control the sharding policy.
+     */
+    @Getter
+    private Optional<String> routing = Optional.empty();
+
+    public void setRouting(String modelName, List<ModelColumn> modelColumns) throws IllegalStateException {
+        if (CollectionUtils.isEmpty(modelColumns)) {
+            return;
+        }
+
+        List<ModelColumn> routingColumns = modelColumns.stream()
+                .filter(col -> col.getElasticSearchExtension().isRouting())
+                .collect(Collectors.toList());
+
+        int size = routingColumns.size();
+        if (size > 1) {
+            throw new IllegalStateException(modelName + "'s routing field is duplicated "
+                    + routingColumns.stream()
+                            .map(col -> col.getColumnName().toString())
+                            .collect(Collectors.joining(",", "[", "]")));
+        }
+
+        if (size == 1) {
+            routing = Optional.of(routingColumns.get(0).getColumnName().getName());
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 23514dd..c346800 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -40,6 +40,7 @@
     private final boolean timeRelativeID;
     private final SQLDatabaseModelExtension sqlDBModelExtension;
     private final BanyanDBModelExtension banyanDBModelExtension;
+    private final ElasticSearchModelExtension elasticSearchModelExtension;
 
     public Model(final String name,
                  final List<ModelColumn> columns,
@@ -50,7 +51,8 @@
                  final Class<?> streamClass,
                  boolean timeRelativeID,
                  final SQLDatabaseModelExtension sqlDBModelExtension,
-                 final BanyanDBModelExtension banyanDBModelExtension) {
+                 final BanyanDBModelExtension banyanDBModelExtension,
+                 final ElasticSearchModelExtension elasticSearchModelExtension) {
         this.name = name;
         this.columns = columns;
         this.scopeId = scopeId;
@@ -62,5 +64,6 @@
         this.timeRelativeID = timeRelativeID;
         this.sqlDBModelExtension = sqlDBModelExtension;
         this.banyanDBModelExtension = banyanDBModelExtension;
+        this.elasticSearchModelExtension = elasticSearchModelExtension;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index bd7b5d4..109e176 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -60,6 +60,7 @@
         ShardingKeyChecker checker = new ShardingKeyChecker();
         SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
         BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
+        ElasticSearchModelExtension elasticSearchModelExtension = new ElasticSearchModelExtension();
         retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
         // Add extra column for additional entities
         if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
@@ -96,7 +97,9 @@
         if (aClass.isAnnotationPresent(BanyanDB.StoreIDAsTag.class)) {
             banyanDBModelExtension.setStoreIDTag(true);
         }
-
+        // Set routing rules for ElasticSearch
+        elasticSearchModelExtension.setRouting(storage.getModelName(), modelColumns);
+        
         checker.check(storage.getModelName());
 
         Model model = new Model(
@@ -109,7 +112,8 @@
             aClass,
             storage.isTimeRelativeID(),
             sqlDBModelExtension,
-            banyanDBModelExtension
+            banyanDBModelExtension,
+            elasticSearchModelExtension
         );
 
         this.followColumnNameRules(model);
@@ -191,10 +195,12 @@
                     ElasticSearch.MatchQuery.class);
                 final ElasticSearch.Column elasticSearchColumn = field.getAnnotation(ElasticSearch.Column.class);
                 final ElasticSearch.Keyword keywordColumn = field.getAnnotation(ElasticSearch.Keyword.class);
+                final ElasticSearch.Routing routingColumn = field.getAnnotation(ElasticSearch.Routing.class);
                 ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension(
                     elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer(),
                     elasticSearchColumn == null ? null : elasticSearchColumn.columnAlias(),
-                    keywordColumn != null
+                    keywordColumn != null,
+                    routingColumn != null
                 );
 
                 // BanyanDB extension
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
index 0d58266..f8305dc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
@@ -36,6 +36,7 @@
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
 import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -84,6 +85,7 @@
     @Column(columnName = TRACE_ID)
     @SQLDatabase.AdditionalEntity(additionalTables = {ADDITIONAL_QUERY_TABLE}, reserveOriginalColumns = true)
     @BanyanDB.SeriesID(index = 0)
+    @ElasticSearch.Routing
     private String traceId;
     @Setter
     @Getter
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
index 5cd1666..c50cc7a 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
@@ -31,7 +31,7 @@
                                              false, false, true, 0,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
                                              new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
         );
         Assert.assertEquals(true, column.isStorageOnly());
@@ -40,7 +40,7 @@
         column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class,
                                  false, false, true, 200,
                                  new SQLDatabaseExtension(),
-                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
                                  new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
         );
         Assert.assertEquals(true, column.isStorageOnly());
@@ -50,7 +50,7 @@
         column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
                                  false, false, true, 200,
                                  new SQLDatabaseExtension(),
-                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
                                  new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
         );
         Assert.assertEquals(false, column.isStorageOnly());
@@ -63,7 +63,7 @@
                                              true, false, true, 200,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
                                              new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
         );
     }
@@ -74,7 +74,7 @@
                                              true, true, false, 200,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
                                              new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
         );
     }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 7167c02..346dbe1 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -348,8 +348,13 @@
 
     public IndexRequestWrapper prepareInsert(String indexName, String id,
                                              Map<String, Object> source) {
+        return prepareInsert(indexName, id, Optional.empty(), source);
+    }
+
+    public IndexRequestWrapper prepareInsert(String indexName, String id, Optional<String> routingValue,
+                                             Map<String, Object> source) {
         indexName = indexNameConverter.apply(indexName);
-        return new IndexRequestWrapper(indexName, TYPE, id, source);
+        return new IndexRequestWrapper(indexName, TYPE, id, routingValue, source);
     }
 
     public UpdateRequestWrapper prepareUpdate(String indexName, String id,
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
index e6bd63e..ecda4fa 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
@@ -18,6 +18,8 @@
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
 import java.util.Map;
+import java.util.Optional;
+
 import lombok.Getter;
 import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -28,10 +30,17 @@
 
     public IndexRequestWrapper(String index, String type, String id,
                                Map<String, ?> source) {
+        this(index, type, id, Optional.empty(), source);
+    }
+
+    public IndexRequestWrapper(String index, String type, String id,
+                               Optional<String> routing,
+                               Map<String, ?> source) {
         request = IndexRequest.builder()
                               .index(index)
                               .type(type)
                               .id(id)
+                              .routing(routing)
                               .doc(source)
                               .build();
     }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java
index 818c3b9..9d9e14b 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java
@@ -18,6 +18,8 @@
 package org.apache.skywalking.library.elasticsearch.requests;
 
 import java.util.Map;
+import java.util.Optional;
+
 import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
@@ -29,5 +31,10 @@
     private final String index;
     private final String type;
     private final String id;
+    /**
+     * The routing value of the request.
+     */
+    @Builder.Default
+    private final Optional<String> routing = Optional.empty();
     private final Map<String, ?> doc;
 }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java
index 467a444..c25e9d6 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java
@@ -38,6 +38,9 @@
                 gen.writeStringField("_index", value.getIndex());
                 gen.writeStringField("_type", value.getType());
                 gen.writeStringField("_id", value.getId());
+                if (value.getRouting().isPresent()) {
+                    gen.writeStringField("routing", value.getRouting().get());
+                }
             }
             gen.writeEndObject();
         }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java
index 87e2dfd..d4b1019 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java
@@ -37,6 +37,9 @@
             {
                 gen.writeStringField("_index", value.getIndex());
                 gen.writeStringField("_id", value.getId());
+                if (value.getRouting().isPresent()) {
+                    gen.writeStringField("routing", value.getRouting().get());
+                }
             }
             gen.writeEndObject();
         }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
index 26a9715..f9a070d 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
@@ -19,6 +19,8 @@
 
 package org.apache.skywalking.library.elasticsearch.requests.search;
 
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import java.time.Duration;
 import java.util.HashMap;
@@ -31,6 +33,7 @@
     private static final String ALLOW_NO_INDICES = "allow_no_indices";
     private static final String EXPAND_WILDCARDS = "expand_wildcards";
     private static final String SCROLL = "scroll";
+    private static final String ROUTING = "routing";
 
     private final Map<String, Object> params = new HashMap<>();
 
@@ -59,6 +62,20 @@
         return this;
     }
 
+    public SearchParams routing(String routing) {
+        checkArgument(StringUtil.isNotBlank(routing),
+                "routing must be not blank");
+        params.put(ROUTING, routing);
+        return this;
+    }
+
+    public SearchParams routing(Iterable<String> routings) {
+        checkArgument(routings != null,
+                "routing set must be non-null");
+        routing(String.join(",", routings));
+        return this;
+    }
+
     @Override
     public Iterator<Entry<String, Object>> iterator() {
         return params.entrySet().iterator();
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 4cfbd62..ed97c2f 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -170,6 +170,8 @@
     advanced: ${SW_STORAGE_ES_ADVANCED:""}
     # Enable shard metrics and records indices into multi-physical indices, one index template per metric/meter aggregation function or record.
     logicSharding: ${SW_STORAGE_ES_LOGIC_SHARDING:false}
+    # Custom routing can reduce the impact of searches. Instead of having to fan out a search request to all the shards in an index, the request can be sent to just the shard that matches the specific routing value (or values).
+    enableCustomRouting: ${SW_STORAGE_ES_ENABLE_CUSTOM_ROUTING:false}
   h2:
     properties:
       jdbcUrl: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db;DB_CLOSE_DELAY=-1}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index c3c26e3..f9f2644 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -154,4 +154,9 @@
      * @since 9.2.0
      */
     private boolean logicSharding = false;
+
+    /**
+     * if enabled, custom routing values will be used, to reduce the number of shards that need to be searched.
+     */
+    private boolean enableCustomRouting = false;
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index c0124dc..ff3f49f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -240,6 +240,7 @@
         this.registerServiceImplementation(
             ISpanAttachedEventQueryDAO.class, new SpanAttachedEventEsDAO(elasticSearchClient, config));
         IndexController.INSTANCE.setLogicSharding(config.isLogicSharding());
+        IndexController.INSTANCE.setEnableCustomRouting(config.isEnableCustomRouting());
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
index 6f1dfdf..99f6dc6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
@@ -49,6 +49,10 @@
     @Getter
     private boolean logicSharding = false;
 
+    @Setter
+    @Getter
+    private boolean enableCustomRouting = false;
+
     public String getTableName(Model model) {
         if (!logicSharding) {
             return isMetricModel(model) ? "metrics-all" :
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index 21ffbb6..5d428ef 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Optional;
+
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -43,6 +45,7 @@
         Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
         String id = IndexController.INSTANCE.generateDocId(model, record.id().build());
-        return getClient().prepareInsert(modelName, id, builder);
+        Optional<String> routingValue = RoutingUtils.getRoutingValue(model, toStorage);
+        return getClient().prepareInsert(modelName, id, routingValue, builder);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RoutingUtils.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RoutingUtils.java
new file mode 100644
index 0000000..bc5b5a4
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RoutingUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
+
+import java.util.Optional;
+
+public class RoutingUtils {
+
+    public static void addRoutingValueToSearchParam(SearchParams searchParams, String routingValue) {
+        if (!IndexController.INSTANCE.isEnableCustomRouting()) {
+            return;
+        }
+        searchParams.routing(routingValue);
+    }
+
+    public static void addRoutingValuesToSearchParam(SearchParams searchParams, Iterable<String> routingValues) {
+        if (!IndexController.INSTANCE.isEnableCustomRouting()) {
+            return;
+        }
+        searchParams.routing(routingValues);
+    }
+
+    /**
+     * get the value of the field annotated {@link ElasticSearch.Routing}
+     */
+    public static Optional<String> getRoutingValue(final Model model, final ElasticSearchConverter.ToStorage toStorage) {
+        if (!IndexController.INSTANCE.isEnableCustomRouting()) {
+            return Optional.empty();
+        }
+        Optional<String> routingField = model.getElasticSearchModelExtension().getRouting();
+        return routingField.map(v -> extractRoutingValue(v, toStorage));
+    }
+
+    private static String extractRoutingValue(String routingField, ElasticSearchConverter.ToStorage toStorage) {
+        Object value = toStorage.get(routingField);
+        if (value == null) {
+            return null;
+        }
+        return value.toString();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index 90b683d..7e98512 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -29,6 +29,7 @@
 import org.apache.skywalking.library.elasticsearch.requests.search.Search;
 import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
 import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
+import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
@@ -48,6 +49,7 @@
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RoutingUtils;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
 
 import static java.util.Objects.nonNull;
@@ -177,7 +179,10 @@
                   .query(Query.term(SegmentRecord.TRACE_ID, traceId))
                   .size(segmentQueryMaxSize);
 
-        final SearchResponse response = getClient().search(index, search.build());
+        SearchParams searchParams = new SearchParams();
+        RoutingUtils.addRoutingValueToSearchParam(searchParams, traceId);
+
+        final SearchResponse response = getClient().search(index, search.build(), searchParams);
 
         List<SegmentRecord> segmentRecords = new ArrayList<>();
         for (SearchHit searchHit : response.getHits().getHits()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
index 3db659d..f7734f0 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
@@ -48,6 +48,7 @@
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RoutingUtils;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
 import zipkin2.Span;
 import zipkin2.storage.QueryRequest;
@@ -140,6 +141,7 @@
         BoolQueryBuilder query = Query.bool().must(Query.term(ZipkinSpanRecord.TRACE_ID, traceId));
         SearchBuilder search = Search.builder().query(query).size(SCROLLING_BATCH_SIZE);
         final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+        RoutingUtils.addRoutingValueToSearchParam(params, traceId);
         SearchResponse response = getClient().search(index, search.build(), params);
         final Set<String> scrollIds = new HashSet<>();
         List<Span> trace = new ArrayList<>();
@@ -237,6 +239,7 @@
         SearchBuilder search = Search.builder().query(query).sort(ZipkinSpanRecord.TIMESTAMP_MILLIS, Sort.Order.DESC)
                                      .size(SCROLLING_BATCH_SIZE); //max span size for 1 scroll
         final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+        RoutingUtils.addRoutingValuesToSearchParam(params, traceIds);
 
         SearchResponse response = getClient().search(index, search.build(), params);
         final Set<String> scrollIds = new HashSet<>();
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
index fe3dcbc..39b7b99 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
@@ -45,12 +45,12 @@
 
         final Type listFieldType = this.getClass().getField("a").getGenericType();
         Assert.assertEquals("keyword", mapping.transform(List.class, listFieldType,
-                                                         new ElasticSearchExtension(null, null, false)
+                                                         new ElasticSearchExtension(null, null, false, false)
         ));
 
         Assert.assertEquals("keyword", mapping.transform(IntList.class, int.class,
-                                                         new ElasticSearchExtension(null, null, true)));
+                                                         new ElasticSearchExtension(null, null, true, false)));
         Assert.assertEquals("text", mapping.transform(IntList.class, int.class,
-                                                         new ElasticSearchExtension(null, null, false)));
+                                                         new ElasticSearchExtension(null, null, false, false)));
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index 8114e68..4967a1f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -24,6 +24,7 @@
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.query.enumeration.Step;
 import org.apache.skywalking.oap.server.core.storage.model.BanyanDBModelExtension;
+import org.apache.skywalking.oap.server.core.storage.model.ElasticSearchModelExtension;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
 import org.junit.Assert;
@@ -43,15 +44,15 @@
     public void prepare() {
         superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(),
                                       0, DownSampling.Second, true, true, Record.class, true,
-                                      new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
+                                      new SQLDatabaseModelExtension(), new BanyanDBModelExtension(), new ElasticSearchModelExtension()
         );
         normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(),
                                       0, DownSampling.Second, true, false, Record.class, true,
-                                      new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
+                                      new SQLDatabaseModelExtension(), new BanyanDBModelExtension(), new ElasticSearchModelExtension()
         );
         normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(),
                                        0, DownSampling.Minute, false, false, Metrics.class, true,
-                                       new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
+                                       new SQLDatabaseModelExtension(), new BanyanDBModelExtension(), new ElasticSearchModelExtension()
         );
         TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
         TimeSeriesUtils.setDAY_STEP(3);