[Optional] Support trace/span on ElasticSearch to use traceIDs routing. (#10282)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 7f915a8..8abd796 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -80,6 +80,7 @@
* Add Python HBase happybase module component ID(94).
* Fix gRPC alarm cannot update settings from dynamic configuration source.
* Add Python Websocket module component ID(7018).
+* [**Breaking Change**] Optimize single trace query performance by customizing routing in ElasticSearch. SkyWalking trace segments and Zipkin spans are using trace ID for routing.
#### UI
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index 2dfd67a..5abe9fa 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -107,6 +107,8 @@
# Set it to `true` could shard metrics indices into multi-physical indices
# as same as the versions(one index template per metric/meter aggregation function) before 9.2.0.
logicSharding: ${SW_STORAGE_ES_LOGIC_SHARDING:false}
+ # Custom routing can reduce the impact of searches. Instead of having to fan out a search request to all the shards in an index, the request can be sent to just the shard that matches the specific routing value (or values).
+ enableCustomRouting: ${SW_STORAGE_ES_ENABLE_CUSTOM_ROUTING:false}
```
### ElasticSearch With Https SSL Encrypting communications.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index b5ceed2..6694de9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -32,6 +32,7 @@
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -67,6 +68,7 @@
@Getter
@Column(columnName = TRACE_ID, length = 150)
@BanyanDB.GlobalIndex
+ @ElasticSearch.Routing
private String traceId;
@Setter
@Getter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
index b592e55..765ab38 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
@@ -23,6 +23,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
/**
* ElasticSearch annotation is a holder including all annotations for ElasticSearch storage
@@ -97,4 +98,12 @@
String columnAlias();
}
+
+ /**
+ * Routing defines a field of {@link Record} to control the sharding policy.
+ */
+ @Target(ElementType.FIELD)
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface Routing {
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
index 2ff5264..8be7062 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
@@ -40,6 +40,8 @@
private final boolean isKeyword;
+ private final boolean isRouting;
+
public boolean needMatchQuery() {
return analyzer != null;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchModelExtension.java
new file mode 100644
index 0000000..cdad596
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchModelExtension.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.model;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class ElasticSearchModelExtension {
+
+ /**
+ * Routing defines a field of {@link Record} to control the sharding policy.
+ */
+ @Getter
+ private Optional<String> routing = Optional.empty();
+
+ public void setRouting(String modelName, List<ModelColumn> modelColumns) throws IllegalStateException {
+ if (CollectionUtils.isEmpty(modelColumns)) {
+ return;
+ }
+
+ List<ModelColumn> routingColumns = modelColumns.stream()
+ .filter(col -> col.getElasticSearchExtension().isRouting())
+ .collect(Collectors.toList());
+
+ int size = routingColumns.size();
+ if (size > 1) {
+ throw new IllegalStateException(modelName + "'s routing field is duplicated "
+ + routingColumns.stream()
+ .map(col -> col.getColumnName().toString())
+ .collect(Collectors.joining(",", "[", "]")));
+ }
+
+ if (size == 1) {
+ routing = Optional.of(routingColumns.get(0).getColumnName().getName());
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 23514dd..c346800 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -40,6 +40,7 @@
private final boolean timeRelativeID;
private final SQLDatabaseModelExtension sqlDBModelExtension;
private final BanyanDBModelExtension banyanDBModelExtension;
+ private final ElasticSearchModelExtension elasticSearchModelExtension;
public Model(final String name,
final List<ModelColumn> columns,
@@ -50,7 +51,8 @@
final Class<?> streamClass,
boolean timeRelativeID,
final SQLDatabaseModelExtension sqlDBModelExtension,
- final BanyanDBModelExtension banyanDBModelExtension) {
+ final BanyanDBModelExtension banyanDBModelExtension,
+ final ElasticSearchModelExtension elasticSearchModelExtension) {
this.name = name;
this.columns = columns;
this.scopeId = scopeId;
@@ -62,5 +64,6 @@
this.timeRelativeID = timeRelativeID;
this.sqlDBModelExtension = sqlDBModelExtension;
this.banyanDBModelExtension = banyanDBModelExtension;
+ this.elasticSearchModelExtension = elasticSearchModelExtension;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index bd7b5d4..109e176 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -60,6 +60,7 @@
ShardingKeyChecker checker = new ShardingKeyChecker();
SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
+ ElasticSearchModelExtension elasticSearchModelExtension = new ElasticSearchModelExtension();
retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
// Add extra column for additional entities
if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
@@ -96,7 +97,9 @@
if (aClass.isAnnotationPresent(BanyanDB.StoreIDAsTag.class)) {
banyanDBModelExtension.setStoreIDTag(true);
}
-
+ // Set routing rules for ElasticSearch
+ elasticSearchModelExtension.setRouting(storage.getModelName(), modelColumns);
+
checker.check(storage.getModelName());
Model model = new Model(
@@ -109,7 +112,8 @@
aClass,
storage.isTimeRelativeID(),
sqlDBModelExtension,
- banyanDBModelExtension
+ banyanDBModelExtension,
+ elasticSearchModelExtension
);
this.followColumnNameRules(model);
@@ -191,10 +195,12 @@
ElasticSearch.MatchQuery.class);
final ElasticSearch.Column elasticSearchColumn = field.getAnnotation(ElasticSearch.Column.class);
final ElasticSearch.Keyword keywordColumn = field.getAnnotation(ElasticSearch.Keyword.class);
+ final ElasticSearch.Routing routingColumn = field.getAnnotation(ElasticSearch.Routing.class);
ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension(
elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer(),
elasticSearchColumn == null ? null : elasticSearchColumn.columnAlias(),
- keywordColumn != null
+ keywordColumn != null,
+ routingColumn != null
);
// BanyanDB extension
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
index 0d58266..f8305dc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
@@ -36,6 +36,7 @@
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -84,6 +85,7 @@
@Column(columnName = TRACE_ID)
@SQLDatabase.AdditionalEntity(additionalTables = {ADDITIONAL_QUERY_TABLE}, reserveOriginalColumns = true)
@BanyanDB.SeriesID(index = 0)
+ @ElasticSearch.Routing
private String traceId;
@Setter
@Getter
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
index 5cd1666..c50cc7a 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
@@ -31,7 +31,7 @@
false, false, true, 0,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
- ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+ ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
);
Assert.assertEquals(true, column.isStorageOnly());
@@ -40,7 +40,7 @@
column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class,
false, false, true, 200,
new SQLDatabaseExtension(),
- new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+ new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
);
Assert.assertEquals(true, column.isStorageOnly());
@@ -50,7 +50,7 @@
column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
false, false, true, 200,
new SQLDatabaseExtension(),
- new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+ new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
);
Assert.assertEquals(false, column.isStorageOnly());
@@ -63,7 +63,7 @@
true, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
- ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+ ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
);
}
@@ -74,7 +74,7 @@
true, true, false, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
- ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
+ ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false),
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED, false)
);
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 7167c02..346dbe1 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -348,8 +348,13 @@
public IndexRequestWrapper prepareInsert(String indexName, String id,
Map<String, Object> source) {
+ return prepareInsert(indexName, id, Optional.empty(), source);
+ }
+
+ public IndexRequestWrapper prepareInsert(String indexName, String id, Optional<String> routingValue,
+ Map<String, Object> source) {
indexName = indexNameConverter.apply(indexName);
- return new IndexRequestWrapper(indexName, TYPE, id, source);
+ return new IndexRequestWrapper(indexName, TYPE, id, routingValue, source);
}
public UpdateRequestWrapper prepareUpdate(String indexName, String id,
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
index e6bd63e..ecda4fa 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import java.util.Map;
+import java.util.Optional;
+
import lombok.Getter;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -28,10 +30,17 @@
public IndexRequestWrapper(String index, String type, String id,
Map<String, ?> source) {
+ this(index, type, id, Optional.empty(), source);
+ }
+
+ public IndexRequestWrapper(String index, String type, String id,
+ Optional<String> routing,
+ Map<String, ?> source) {
request = IndexRequest.builder()
.index(index)
.type(type)
.id(id)
+ .routing(routing)
.doc(source)
.build();
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java
index 818c3b9..9d9e14b 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/IndexRequest.java
@@ -18,6 +18,8 @@
package org.apache.skywalking.library.elasticsearch.requests;
import java.util.Map;
+import java.util.Optional;
+
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
@@ -29,5 +31,10 @@
private final String index;
private final String type;
private final String id;
+ /**
+ * The routing value of the request.
+ */
+ @Builder.Default
+ private final Optional<String> routing = Optional.empty();
private final Map<String, ?> doc;
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java
index 467a444..c25e9d6 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v6/codec/V6IndexRequestSerializer.java
@@ -38,6 +38,9 @@
gen.writeStringField("_index", value.getIndex());
gen.writeStringField("_type", value.getType());
gen.writeStringField("_id", value.getId());
+ if (value.getRouting().isPresent()) {
+ gen.writeStringField("routing", value.getRouting().get());
+ }
}
gen.writeEndObject();
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java
index 87e2dfd..d4b1019 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/factory/v7plus/codec/V7IndexRequestSerializer.java
@@ -37,6 +37,9 @@
{
gen.writeStringField("_index", value.getIndex());
gen.writeStringField("_id", value.getId());
+ if (value.getRouting().isPresent()) {
+ gen.writeStringField("routing", value.getRouting().get());
+ }
}
gen.writeEndObject();
}
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
index 26a9715..f9a070d 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/requests/search/SearchParams.java
@@ -19,6 +19,8 @@
package org.apache.skywalking.library.elasticsearch.requests.search;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
import static com.google.common.base.Preconditions.checkArgument;
import java.time.Duration;
import java.util.HashMap;
@@ -31,6 +33,7 @@
private static final String ALLOW_NO_INDICES = "allow_no_indices";
private static final String EXPAND_WILDCARDS = "expand_wildcards";
private static final String SCROLL = "scroll";
+ private static final String ROUTING = "routing";
private final Map<String, Object> params = new HashMap<>();
@@ -59,6 +62,20 @@
return this;
}
+ public SearchParams routing(String routing) {
+ checkArgument(StringUtil.isNotBlank(routing),
+ "routing must be not blank");
+ params.put(ROUTING, routing);
+ return this;
+ }
+
+ public SearchParams routing(Iterable<String> routings) {
+ checkArgument(routings != null,
+ "routing set must be non-null");
+ routing(String.join(",", routings));
+ return this;
+ }
+
@Override
public Iterator<Entry<String, Object>> iterator() {
return params.entrySet().iterator();
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 4cfbd62..ed97c2f 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -170,6 +170,8 @@
advanced: ${SW_STORAGE_ES_ADVANCED:""}
# Enable shard metrics and records indices into multi-physical indices, one index template per metric/meter aggregation function or record.
logicSharding: ${SW_STORAGE_ES_LOGIC_SHARDING:false}
+ # Custom routing can reduce the impact of searches. Instead of having to fan out a search request to all the shards in an index, the request can be sent to just the shard that matches the specific routing value (or values).
+ enableCustomRouting: ${SW_STORAGE_ES_ENABLE_CUSTOM_ROUTING:false}
h2:
properties:
jdbcUrl: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db;DB_CLOSE_DELAY=-1}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index c3c26e3..f9f2644 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -154,4 +154,9 @@
* @since 9.2.0
*/
private boolean logicSharding = false;
+
+ /**
+ * if enabled, custom routing values will be used, to reduce the number of shards that need to be searched.
+ */
+ private boolean enableCustomRouting = false;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index c0124dc..ff3f49f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -240,6 +240,7 @@
this.registerServiceImplementation(
ISpanAttachedEventQueryDAO.class, new SpanAttachedEventEsDAO(elasticSearchClient, config));
IndexController.INSTANCE.setLogicSharding(config.isLogicSharding());
+ IndexController.INSTANCE.setEnableCustomRouting(config.isEnableCustomRouting());
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
index 6f1dfdf..99f6dc6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
@@ -49,6 +49,10 @@
@Getter
private boolean logicSharding = false;
+ @Setter
+ @Getter
+ private boolean enableCustomRouting = false;
+
public String getTableName(Model model) {
if (!logicSharding) {
return isMetricModel(model) ? "metrics-all" :
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index 21ffbb6..5d428ef 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Map;
+import java.util.Optional;
+
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -43,6 +45,7 @@
Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, record.id().build());
- return getClient().prepareInsert(modelName, id, builder);
+ Optional<String> routingValue = RoutingUtils.getRoutingValue(model, toStorage);
+ return getClient().prepareInsert(modelName, id, routingValue, builder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RoutingUtils.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RoutingUtils.java
new file mode 100644
index 0000000..bc5b5a4
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RoutingUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
+
+import java.util.Optional;
+
+public class RoutingUtils {
+
+ public static void addRoutingValueToSearchParam(SearchParams searchParams, String routingValue) {
+ if (!IndexController.INSTANCE.isEnableCustomRouting()) {
+ return;
+ }
+ searchParams.routing(routingValue);
+ }
+
+ public static void addRoutingValuesToSearchParam(SearchParams searchParams, Iterable<String> routingValues) {
+ if (!IndexController.INSTANCE.isEnableCustomRouting()) {
+ return;
+ }
+ searchParams.routing(routingValues);
+ }
+
+ /**
+ * get the value of the field annotated {@link ElasticSearch.Routing}
+ */
+ public static Optional<String> getRoutingValue(final Model model, final ElasticSearchConverter.ToStorage toStorage) {
+ if (!IndexController.INSTANCE.isEnableCustomRouting()) {
+ return Optional.empty();
+ }
+ Optional<String> routingField = model.getElasticSearchModelExtension().getRouting();
+ return routingField.map(v -> extractRoutingValue(v, toStorage));
+ }
+
+ private static String extractRoutingValue(String routingField, ElasticSearchConverter.ToStorage toStorage) {
+ Object value = toStorage.get(routingField);
+ if (value == null) {
+ return null;
+ }
+ return value.toString();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index 90b683d..7e98512 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -29,6 +29,7 @@
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
+import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
@@ -48,6 +49,7 @@
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RoutingUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import static java.util.Objects.nonNull;
@@ -177,7 +179,10 @@
.query(Query.term(SegmentRecord.TRACE_ID, traceId))
.size(segmentQueryMaxSize);
- final SearchResponse response = getClient().search(index, search.build());
+ SearchParams searchParams = new SearchParams();
+ RoutingUtils.addRoutingValueToSearchParam(searchParams, traceId);
+
+ final SearchResponse response = getClient().search(index, search.build(), searchParams);
List<SegmentRecord> segmentRecords = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
index 3db659d..f7734f0 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
@@ -48,6 +48,7 @@
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RoutingUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import zipkin2.Span;
import zipkin2.storage.QueryRequest;
@@ -140,6 +141,7 @@
BoolQueryBuilder query = Query.bool().must(Query.term(ZipkinSpanRecord.TRACE_ID, traceId));
SearchBuilder search = Search.builder().query(query).size(SCROLLING_BATCH_SIZE);
final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+ RoutingUtils.addRoutingValueToSearchParam(params, traceId);
SearchResponse response = getClient().search(index, search.build(), params);
final Set<String> scrollIds = new HashSet<>();
List<Span> trace = new ArrayList<>();
@@ -237,6 +239,7 @@
SearchBuilder search = Search.builder().query(query).sort(ZipkinSpanRecord.TIMESTAMP_MILLIS, Sort.Order.DESC)
.size(SCROLLING_BATCH_SIZE); //max span size for 1 scroll
final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+ RoutingUtils.addRoutingValuesToSearchParam(params, traceIds);
SearchResponse response = getClient().search(index, search.build(), params);
final Set<String> scrollIds = new HashSet<>();
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
index fe3dcbc..39b7b99 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
@@ -45,12 +45,12 @@
final Type listFieldType = this.getClass().getField("a").getGenericType();
Assert.assertEquals("keyword", mapping.transform(List.class, listFieldType,
- new ElasticSearchExtension(null, null, false)
+ new ElasticSearchExtension(null, null, false, false)
));
Assert.assertEquals("keyword", mapping.transform(IntList.class, int.class,
- new ElasticSearchExtension(null, null, true)));
+ new ElasticSearchExtension(null, null, true, false)));
Assert.assertEquals("text", mapping.transform(IntList.class, int.class,
- new ElasticSearchExtension(null, null, false)));
+ new ElasticSearchExtension(null, null, false, false)));
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index 8114e68..4967a1f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -24,6 +24,7 @@
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.apache.skywalking.oap.server.core.storage.model.BanyanDBModelExtension;
+import org.apache.skywalking.oap.server.core.storage.model.ElasticSearchModelExtension;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.junit.Assert;
@@ -43,15 +44,15 @@
public void prepare() {
superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(),
0, DownSampling.Second, true, true, Record.class, true,
- new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
+ new SQLDatabaseModelExtension(), new BanyanDBModelExtension(), new ElasticSearchModelExtension()
);
normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(),
0, DownSampling.Second, true, false, Record.class, true,
- new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
+ new SQLDatabaseModelExtension(), new BanyanDBModelExtension(), new ElasticSearchModelExtension()
);
normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(),
0, DownSampling.Minute, false, false, Metrics.class, true,
- new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
+ new SQLDatabaseModelExtension(), new BanyanDBModelExtension(), new ElasticSearchModelExtension()
);
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
TimeSeriesUtils.setDAY_STEP(3);