Support Elasticsearch column alias for the compatibility between storage logicSharding model and no-logicSharding model. (#9442)

## New ElasticSearch storage option explanation in 9.2.0
Since v9.2.0, SkyWalking OAP provides 2 storage options for metrics/meter and records, 
system environment variable is (`SW_STORAGE_ES_LOGIC_SHARDING`):

### No-Sharding Model (OAP default setting, `SW_STORAGE_ES_LOGIC_SHARDING = false`)
1. OAP merges all metrics/meter and records(without super datasets, such as segments) indices into one physical 
index template `metrics-all` and `records-all`.
2. The logic index name would be present in columns `metric_table` and `record_table`.
3. If the logic column name has an alias (configured by `@ElasticSearch.Column()`), the alias would be the real physical column name.

### No-Sharding Model (`SW_STORAGE_ES_LOGIC_SHARDING = true `)
1. OAP shard metrics/meter indices into multi-physical indices as in the previous versions(one index template per metric/meter aggregation function).
2. Records and metrics without configuring aggregation function in `@MetricsFunction` and `@MeterFunction` would not be sharded.
3. The shard template name would be `metrics-aggregation function name` or `meter-aggregation function name` such as `metrics-count`,
and the logic index name would be present in column `metric_table`.
4. The OAP **would not** use the column alias, the logic column name would be the real physical column name.

**Notice**: 
Users still could choose to adjust ElasticSearch's shard number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out no matter the option.
diff --git a/docs/en/FAQ/New-ElasticSearch-storage-option-explanation-in-9.2.0.md b/docs/en/FAQ/New-ElasticSearch-storage-option-explanation-in-9.2.0.md
new file mode 100644
index 0000000..11a2f7b
--- /dev/null
+++ b/docs/en/FAQ/New-ElasticSearch-storage-option-explanation-in-9.2.0.md
@@ -0,0 +1,19 @@
+## New ElasticSearch storage option explanation in 9.2.0
+Since v9.2.0, SkyWalking OAP provide 2 storage options for metrics/meter and records, 
+system environment variable is (`SW_STORAGE_ES_LOGIC_SHARDING`):
+
+### No-Sharding Model (OAP default setting, `SW_STORAGE_ES_LOGIC_SHARDING = false`)
+1. OAP merge all metrics/meter and records(without super datasets, such as segments) indices into one physical 
+index template `metrics-all` and `records-all`.
+2. The logic index name would present in column `metric_table` and `record_table`.
+3. If logic column name has alias (configured by `@ElasticSearch.Column()`), the alias would be the real physical column name.
+
+### No-Sharding Model (`SW_STORAGE_ES_LOGIC_SHARDING = true `)
+1. OAP shard metrics/meter indices into multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
+2. Records and metrics without configure aggregation function in `@MetricsFunction` and `@MeterFunction` would not be sharded.
+3. The shard template name would be `metrics-aggregation function name` or `meter-aggregation function name` such as `metrics-count`,
+and the logic index name would present in column `metric_table`.
+4. The OAP **would not** use the column alias, the logic column name would be the real physical column name.
+
+**Notice**: 
+Users still could choose to adjust ElasticSearch's shard number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out no matter the option is.
diff --git a/docs/en/FAQ/README.md b/docs/en/FAQ/README.md
index f44abac..ed43bbf 100644
--- a/docs/en/FAQ/README.md
+++ b/docs/en/FAQ/README.md
@@ -11,6 +11,7 @@
 * [Compiling issues on Mac's M1 chip](How-to-build-with-mac-m1.md)
 
 ## Runtime
+* [New ElasticSearch storage option explanation in 9.2.0](New-ElasticSearch-storage-option-explanation-in-9.2.0.md)
 * [Version 9.x+ upgrade](v9-version-upgrade.md)
 * [Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0](es-version-conflict.md)
 * [Version 8.x+ upgrade](v8-version-upgrade.md)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 6d39be5..26d3ea9 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -36,15 +36,17 @@
 * Add `VIRTUAL_CACHE` to Layer, to fix conjectured Redis server, which icon can't show on the topology.
 * [Breaking Change] Elasticsearch storage merge all metrics/meter and records(without super datasets) indices into one
   physical index template `metrics-all` and `records-all` on the default setting.
-  Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/records indices into
+  Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/meter indices into
   multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
   In the current one index mode, users still could choose to adjust ElasticSearch's shard
   number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out.
-* [Breaking Change] Many columns of metrics and records model names are changed, The H2/Mysql/Tidb/Postgres storage
-  users are required to remove all metrics-related and records-related tables for OAP to re-create or use a new database
-  instance.
+  More details please refer to [New ElasticSearch storage option explanation in 9.2.0](../FAQ/New-ElasticSearch-storage-option-explanation-in-9.2.0.md)
+  and [backend-storage.md](../setup/backend/backend-storage.md)
+* [Breaking Change] Index/table `ebpf_profiling_schedule` added a new column `ebpf_profiling_schedule_id`,
+  the H2/Mysql/Tidb/Postgres storage users are required to re-created it when bump up from previous releases.
 * Fix Zipkin trace query the max size of spans.
 * Add `tls` and `https` component IDs for Network Profiling.
+* Support Elasticsearch column alias for the compatibility between storage logicSharding model and no-logicSharding model.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/EndpointTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/EndpointTraffic.java
index fa67c33..0118af4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/EndpointTraffic.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpoint/EndpointTraffic.java
@@ -45,7 +45,7 @@
     public static final String INDEX_NAME = "endpoint_traffic";
 
     public static final String SERVICE_ID = "service_id";
-    public static final String NAME = "endpoint_traffic_name";
+    public static final String NAME = "name";
 
     @Setter
     @Getter
@@ -55,6 +55,7 @@
     @Getter
     @Column(columnName = NAME)
     @ElasticSearch.MatchQuery
+    @ElasticSearch.Column(columnAlias = "endpoint_traffic_name")
     private String name = Const.EMPTY_STRING;
 
     @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/InstanceTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/InstanceTraffic.java
index 14098da..dab5bb7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/InstanceTraffic.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/instance/InstanceTraffic.java
@@ -31,6 +31,7 @@
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -48,7 +49,7 @@
 public class InstanceTraffic extends Metrics {
     public static final String INDEX_NAME = "instance_traffic";
     public static final String SERVICE_ID = "service_id";
-    public static final String NAME = "instance_traffic_name";
+    public static final String NAME = "name";
     public static final String LAST_PING_TIME_BUCKET = "last_ping";
     public static final String PROPERTIES = "properties";
 
@@ -62,6 +63,7 @@
     @Setter
     @Getter
     @Column(columnName = NAME, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "instance_traffic_name")
     private String name;
 
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceTraffic.java
index c7bd000..70bc83d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceTraffic.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceTraffic.java
@@ -49,7 +49,7 @@
 public class ServiceTraffic extends Metrics {
     public static final String INDEX_NAME = "service_traffic";
 
-    public static final String NAME = "service_traffic_name";
+    public static final String NAME = "name";
 
     public static final String SHORT_NAME = "short_name";
 
@@ -63,6 +63,7 @@
     @Getter
     @Column(columnName = NAME)
     @ElasticSearch.MatchQuery
+    @ElasticSearch.Column(columnAlias = "service_traffic_name")
     private String name = Const.EMPTY_STRING;
 
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
index 6fa1656..39be2b6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
@@ -39,6 +39,7 @@
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -52,7 +53,7 @@
 public abstract class PercentileFunction extends Meter implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
     public static final String DATASET = "dataset";
     public static final String RANKS = "ranks";
-    public static final String VALUE = "datatable_value";
+    public static final String VALUE = "value";
 
     @Setter
     @Getter
@@ -62,6 +63,7 @@
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_value")
     private DataTable percentileValues = new DataTable(10);
     @Getter
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramFunction.java
index 6c48701..35dc544 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramFunction.java
@@ -36,6 +36,7 @@
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -55,8 +56,8 @@
 @ToString
 public abstract class AvgHistogramFunction extends Meter implements AcceptableValue<BucketedValues> {
     public static final String DATASET = "dataset";
-    protected static final String SUMMATION = "datatable_summation";
-    protected static final String COUNT = "datatable_count";
+    protected static final String SUMMATION = "summation";
+    protected static final String COUNT = "count";
 
     @Setter
     @Getter
@@ -66,10 +67,12 @@
     @Getter
     @Setter
     @Column(columnName = SUMMATION, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_summation")
     protected DataTable summation = new DataTable(30);
     @Getter
     @Setter
     @Column(columnName = COUNT, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_count")
     protected DataTable count = new DataTable(30);
     @Getter
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramPercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramPercentileFunction.java
index 71aa4c3..ace67de 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramPercentileFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgHistogramPercentileFunction.java
@@ -45,6 +45,7 @@
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -69,9 +70,9 @@
     private static final String DEFAULT_GROUP = "pD";
     public static final String DATASET = "dataset";
     public static final String RANKS = "ranks";
-    public static final String VALUE = "datatable_value";
-    protected static final String SUMMATION = "datatable_summation";
-    protected static final String COUNT = "datatable_count";
+    public static final String VALUE = "value";
+    protected static final String SUMMATION = "summation";
+    protected static final String COUNT = "count";
 
     @Setter
     @Getter
@@ -81,14 +82,17 @@
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_value")
     private DataTable percentileValues = new DataTable(10);
     @Getter
     @Setter
     @Column(columnName = SUMMATION, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_summation")
     protected DataTable summation = new DataTable(30);
     @Getter
     @Setter
     @Column(columnName = COUNT, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_count")
     protected DataTable count = new DataTable(30);
     @Getter
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgLabeledFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgLabeledFunction.java
index b69c956..d971ef7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgLabeledFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/avg/AvgLabeledFunction.java
@@ -36,6 +36,7 @@
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -43,9 +44,9 @@
 @MeterFunction(functionName = "avgLabeled")
 @ToString
 public abstract class AvgLabeledFunction extends Meter implements AcceptableValue<DataTable>, LabeledValueHolder {
-    protected static final String SUMMATION = "datatable_summation";
-    protected static final String COUNT = "datatable_count";
-    protected static final String VALUE = "datatable_value";
+    protected static final String SUMMATION = "summation";
+    protected static final String COUNT = "count";
+    protected static final String VALUE = "value";
 
     @Setter
     @Getter
@@ -64,14 +65,17 @@
     @Getter
     @Setter
     @Column(columnName = SUMMATION, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_summation")
     protected DataTable summation = new DataTable(30);
     @Getter
     @Setter
     @Column(columnName = COUNT, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_count")
     protected DataTable count = new DataTable(30);
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_value")
     private DataTable value = new DataTable(30);
 
     @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumHistogramPercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumHistogramPercentileFunction.java
index b375035..a06ce93 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumHistogramPercentileFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumHistogramPercentileFunction.java
@@ -39,6 +39,7 @@
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@@ -61,8 +62,8 @@
     private static final String DEFAULT_GROUP = "pD";
     public static final String DATASET = "dataset";
     public static final String RANKS = "ranks";
-    public static final String VALUE = "datatable_value";
-    protected static final String SUMMATION = "datatable_summation";
+    public static final String VALUE = "value";
+    protected static final String SUMMATION = "summation";
 
     @Setter
     @Getter
@@ -72,10 +73,12 @@
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_value")
     private DataTable percentileValues = new DataTable(10);
     @Getter
     @Setter
     @Column(columnName = SUMMATION, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_summation")
     protected DataTable summation = new DataTable(30);
     /**
      * Rank
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/ApdexMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/ApdexMetrics.java
index 1fb724d..873a153 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/ApdexMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/ApdexMetrics.java
@@ -27,6 +27,7 @@
 import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
 import org.apache.skywalking.oap.server.core.query.sql.Function;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 
 /**
  * Apdex dissatisfaction levels of Tolerating (apdex_t) and Frustrated (apdex_f) indicate how slow site performance
@@ -44,7 +45,7 @@
     protected static final String S_NUM = "s_num";
     // Level: tolerated
     protected static final String T_NUM = "t_num";
-    protected static final String VALUE = "int_value";
+    protected static final String VALUE = "value";
 
     @Getter
     @Setter
@@ -61,6 +62,7 @@
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Avg)
+    @ElasticSearch.Column(columnAlias = "int_value")
     private int value;
 
     @Entrance
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DoubleAvgMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DoubleAvgMetrics.java
index f11036a..1e9d7b7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DoubleAvgMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DoubleAvgMetrics.java
@@ -26,17 +26,19 @@
 import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
 import org.apache.skywalking.oap.server.core.query.sql.Function;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 
 @MetricsFunction(functionName = "doubleAvg")
 public abstract class DoubleAvgMetrics extends Metrics implements DoubleValueHolder {
 
-    protected static final String SUMMATION = "double_summation";
+    protected static final String SUMMATION = "summation";
     protected static final String COUNT = "count";
-    protected static final String VALUE = "double_value";
+    protected static final String VALUE = "value";
 
     @Getter
     @Setter
     @Column(columnName = SUMMATION, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "double_summation")
     private double summation;
     @Getter
     @Setter
@@ -45,6 +47,7 @@
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Avg)
+    @ElasticSearch.Column(columnAlias = "double_value")
     private double value;
 
     @Entrance
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
index 3808330..24aa7b7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
@@ -28,6 +28,7 @@
 import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsFunction;
 import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 
 /**
  * Percentile is a better implementation than deprecated PxxMetrics in older releases.
@@ -38,7 +39,7 @@
 @MetricsFunction(functionName = "percentile")
 public abstract class PercentileMetrics extends Metrics implements MultiIntValuesHolder {
     protected static final String DATASET = "dataset";
-    protected static final String VALUE = "datatable_value";
+    protected static final String VALUE = "value";
     protected static final String PRECISION = "precision";
 
     private static final int[] RANKS = {
@@ -52,6 +53,7 @@
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+    @ElasticSearch.Column(columnAlias = "datatable_value")
     private DataTable percentileValues;
     @Getter
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
index ddcec7d..5d8b683 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
@@ -68,4 +68,18 @@
             }
         }
     }
+
+    @Target({ElementType.FIELD})
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface Column {
+
+        /**
+         * Warning: this is only used to solve the conflict among the existing columns since we need support to merge all metrics
+         * in one physical index template. When creating a new column, we should avoid the compatibility issue
+         * between these 2 storage modes rather than use this alias.
+         */
+        @Deprecated
+        String columnAlias();
+
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
index 11566721..3323bd8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
@@ -36,6 +36,8 @@
      */
     private final ElasticSearch.MatchQuery.AnalyzerType analyzer;
 
+    private final String columnAlias;
+
     public boolean needMatchQuery() {
         return analyzer != null;
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 28a47de..2d85c2e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -162,8 +162,10 @@
                 // ElasticSearch extension
                 final ElasticSearch.MatchQuery elasticSearchAnalyzer = field.getAnnotation(
                     ElasticSearch.MatchQuery.class);
+                final ElasticSearch.Column elasticSearchColumn = field.getAnnotation(ElasticSearch.Column.class);
                 ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension(
-                    elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer()
+                    elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer(),
+                    elasticSearchColumn == null ? null : elasticSearchColumn.columnAlias()
                 );
 
                 // BanyanDB extension
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
index ff48a20..d9fc9dd 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
@@ -30,7 +30,7 @@
                                              false, false, true, 0,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
+                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
                                              new BanyanDBExtension(-1, false, true)
         );
         Assert.assertEquals(true, column.isStorageOnly());
@@ -39,7 +39,7 @@
         column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class,
                                  false, false, true, 200,
                                  new SQLDatabaseExtension(),
-                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
+                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
                                  new BanyanDBExtension(-1, false, true)
         );
         Assert.assertEquals(true, column.isStorageOnly());
@@ -49,7 +49,7 @@
         column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
                                  false, false, true, 200,
                                  new SQLDatabaseExtension(),
-                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
+                                 new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
                                  new BanyanDBExtension(-1, false, true)
         );
         Assert.assertEquals(false, column.isStorageOnly());
@@ -62,7 +62,7 @@
                                              true, false, true, 200,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
+                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
                                              new BanyanDBExtension(-1, false, true)
         );
     }
@@ -73,7 +73,7 @@
                                              true, true, false, 200,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER),
+                                                 ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
                                              new BanyanDBExtension(-1, false, true)
         );
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchConverter.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchConverter.java
new file mode 100644
index 0000000..0cef8fb
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchConverter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+/**
+ * ElasticSearchConverter same as the HashMapConverter, but translate the column alias automatically.
+ */
+public class ElasticSearchConverter {
+
+    @RequiredArgsConstructor
+    public static class ToEntity implements Convert2Entity {
+        private final String modelName;
+        private final Map<String, Object> source;
+
+        @Override
+        public Object get(final String fieldName) {
+            return source.get(getPhysicalColumnName(modelName, fieldName));
+        }
+
+        @Override
+        public byte[] getBytes(final String fieldName) {
+            final String value = (String) source.get(getPhysicalColumnName(modelName, fieldName));
+            if (StringUtil.isEmpty(value)) {
+                return new byte[] {};
+            }
+            return Base64.getDecoder().decode(value);
+        }
+    }
+
+    public static class ToStorage implements Convert2Storage<Map<String, Object>> {
+        private Map<String, Object> source;
+        private String modelName;
+
+        public ToStorage(String modelName) {
+            source = new HashMap();
+            this.modelName = modelName;
+        }
+
+        @Override
+        public void accept(final String fieldName, final Object fieldValue) {
+            source.put(getPhysicalColumnName(modelName, fieldName)
+                , fieldValue);
+        }
+
+        @Override
+        public void accept(final String fieldName, final byte[] fieldValue) {
+            if (CollectionUtils.isEmpty(fieldValue)) {
+                source.put(getPhysicalColumnName(modelName, fieldName), Const.EMPTY_STRING);
+            } else {
+                source.put(getPhysicalColumnName(modelName, fieldName), new String(Base64.getEncoder().encode(fieldValue)));
+            }
+        }
+
+        @Override
+        public void accept(final String fieldName, final List<String> fieldValue) {
+            this.accept(getPhysicalColumnName(modelName, fieldName), (Object) fieldValue);
+        }
+
+        @Override
+        public Object get(final String fieldName) {
+            return source.get(getPhysicalColumnName(modelName, fieldName));
+        }
+
+        @Override
+        public Map<String, Object> obtain() {
+            return source;
+        }
+    }
+
+    private static String getPhysicalColumnName(String modelName, String fieldName) {
+        return IndexController.LogicIndicesRegister.getPhysicalColumnName(modelName, fieldName);
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
index a118321..80fdc9f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndexController.java
@@ -23,15 +23,16 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.analysis.FunctionCategory;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
-import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 /**
@@ -45,6 +46,7 @@
      * Init in StorageModuleElasticsearchProvider.prepare() and the value from the config.
      */
     @Setter
+    @Getter
     private boolean logicSharding = false;
 
     public String getTableName(Model model) {
@@ -124,6 +126,8 @@
 
         private static final Map<String/*physical index name*/, Map<String/*column name*/, ModelColumn>> PHYSICAL_INDICES_COLUMNS = new HashMap<>();
 
+        private static final Map<String/*logic index name*/, Map<String/*column name*/, String/*alias*/>> LOGIC_INDICES_COLUMNS_ALIAS = new HashMap<>();
+
         /**
          * The metric table name in aggregation physical storage.
          */
@@ -141,15 +145,34 @@
         public static void registerRelation(Model model, String physicalName) {
             LOGIC_INDICES_CATALOG.put(model.getName(), physicalName);
             Map<String, ModelColumn> columns = PHYSICAL_INDICES_COLUMNS.computeIfAbsent(
-                physicalName, v -> new ConcurrentHashMap<>());
-            model.getColumns().forEach(modelColumn -> {
-                String columnName = modelColumn.getColumnName().getName();
-                if (columns.containsKey(columnName)) {
-                    checkModelColumnConflicts(columns.get(columnName), modelColumn, physicalName);
-                } else {
-                    columns.put(columnName, modelColumn);
-                }
-            });
+                physicalName, v -> new HashMap<>());
+            if (!IndexController.INSTANCE.logicSharding) {
+                model.getColumns().forEach(modelColumn -> {
+                    String columnName = modelColumn.getColumnName().getName();
+                    String alias = modelColumn.getElasticSearchExtension().getColumnAlias();
+                    if (alias != null) {
+                        Map<String, String> aliasMap = LOGIC_INDICES_COLUMNS_ALIAS.computeIfAbsent(
+                            model.getName(), v -> new HashMap<>());
+                        aliasMap.put(modelColumn.getColumnName().getName(), alias);
+                        columnName = alias;
+                    }
+                    if (columns.containsKey(columnName)) {
+                        checkModelColumnConflicts(columns.get(columnName), modelColumn, physicalName);
+                    } else {
+                        columns.put(columnName, modelColumn);
+                    }
+                });
+            } else {
+                model.getColumns().forEach(modelColumn -> {
+                    String columnName = modelColumn.getColumnName().getName();
+                    if (columns.containsKey(columnName)) {
+                        checkModelColumnConflicts(columns.get(columnName), modelColumn, physicalName);
+                    } else {
+                        columns.put(columnName, modelColumn);
+                    }
+                });
+            }
+
         }
 
         public static boolean isPhysicalTable(String logicName) {
@@ -161,6 +184,27 @@
             return new ArrayList<>(PHYSICAL_INDICES_COLUMNS.get(tableName).values());
         }
 
+        /**
+         * Get real physical column name by logic name.
+         * Warning: This is only used to solve the column has alias.
+         */
+        @Deprecated
+        public static String getPhysicalColumnName(String modelName, String columnName) {
+            if (IndexController.INSTANCE.logicSharding) {
+                return columnName;
+            }
+
+            Map<String, String> aliasMap = LOGIC_INDICES_COLUMNS_ALIAS.get(modelName);
+            if (CollectionUtils.isEmpty(aliasMap)) {
+                return columnName;
+            }
+
+            return aliasMap.getOrDefault(columnName, columnName);
+        }
+
+        /**
+         * Check the columns conflicts when they in one physical index
+         */
         private static void checkModelColumnConflicts(ModelColumn mc1, ModelColumn mc2, String physicalName) {
             if (!(mc1.isIndexOnly() == mc2.isIndexOnly())) {
                 throw new IllegalArgumentException(mc1.getColumnName() + " and " + mc2.getColumnName() + " isIndexOnly conflict in index: " + physicalName);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java
index db862fa..9693c90 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ManagementEsDAO.java
@@ -23,7 +23,6 @@
 import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
 import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 
@@ -44,7 +43,7 @@
         if (exist) {
             return;
         }
-        final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+        final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(managementData, toStorage);
         Map<String, Object> source =
             IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 5d1290f..0363669 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -33,7 +33,6 @@
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -72,7 +71,7 @@
             if (!indexIdsGroup.isEmpty()) {
                 final Optional<Documents> response = getClient().ids(indexIdsGroup);
                 response.ifPresent(documents -> documents.forEach(document -> {
-                    Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(document.getSource()));
+                    Metrics source = storageBuilder.storage2Entity(new ElasticSearchConverter.ToEntity(model.getName(), document.getSource()));
                     result.add(source);
                 }));
             }
@@ -89,7 +88,7 @@
                                              .collect(Collectors.toList());
                 final SearchResponse response = getClient().searchIDs(tableName, ids);
                 response.getHits().getHits().forEach(hit -> {
-                    Metrics source = storageBuilder.storage2Entity(new HashMapConverter.ToEntity(hit.getSource()));
+                    Metrics source = storageBuilder.storage2Entity(new ElasticSearchConverter.ToEntity(model.getName(), hit.getSource()));
                     result.add(source);
                 });
             });
@@ -99,7 +98,7 @@
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
-        final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+        final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
@@ -109,7 +108,7 @@
 
     @Override
     public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
-        final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+        final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(metrics, toStorage);
         Map<String, Object> builder =
             IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java
index e569b0a..811923c 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/NoneStreamEsDAO.java
@@ -23,7 +23,6 @@
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 
@@ -41,7 +40,7 @@
 
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
-        final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+        final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(noneStream, toStorage);
         Map<String, Object> builder =
             IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index 3633f2c..264007a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -23,7 +23,6 @@
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -39,7 +38,7 @@
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+        final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
         storageBuilder.entity2Storage(record, toStorage);
         Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
         String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 9a5751a..306709f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -233,13 +233,18 @@
         Mappings.Source source = new Mappings.Source();
         for (ModelColumn columnDefine : model.getColumns()) {
             final String type = columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType());
+            String columnName = columnDefine.getColumnName().getName();
+            String alias = columnDefine.getElasticSearchExtension().getColumnAlias();
+            if (!config.isLogicSharding() && alias != null) {
+                columnName = alias;
+            }
             if (columnDefine.getElasticSearchExtension().needMatchQuery()) {
-                String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
+                String matchCName = MatchCNameBuilder.INSTANCE.build(columnName);
 
                 Map<String, Object> originalColumn = new HashMap<>();
                 originalColumn.put("type", type);
                 originalColumn.put("copy_to", matchCName);
-                properties.put(columnDefine.getColumnName().getName(), originalColumn);
+                properties.put(columnName, originalColumn);
 
                 Map<String, Object> matchColumn = new HashMap<>();
                 matchColumn.put("type", "text");
@@ -252,11 +257,11 @@
                 if (columnDefine.isStorageOnly() && !"binary".equals(type)) {
                     column.put("index", false);
                 }
-                properties.put(columnDefine.getColumnName().getName(), column);
+                properties.put(columnName, column);
             }
 
             if (columnDefine.isIndexOnly()) {
-                source.getExcludes().add(columnDefine.getColumnName().getName());
+                source.getExcludes().add(columnName);
             }
         }
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
index 69b713a..7794e6b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressAliasEsDAO.java
@@ -31,9 +31,9 @@
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
@@ -81,7 +81,7 @@
                     for (SearchHit searchHit : results.getHits()) {
                         networkAddressAliases.add(
                             builder.storage2Entity(
-                                new HashMapConverter.ToEntity(searchHit.getSource())));
+                                new ElasticSearchConverter.ToEntity(NetworkAddressAlias.INDEX_NAME, searchHit.getSource())));
                     }
                     if (results.getHits().getTotal() < batchSize) {
                         break;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AlarmQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AlarmQueryEsDAO.java
index af90364..c25617d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AlarmQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AlarmQueryEsDAO.java
@@ -35,9 +35,9 @@
 import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
 import org.apache.skywalking.oap.server.core.query.type.Alarms;
 import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
@@ -88,7 +88,7 @@
 
         for (SearchHit searchHit : response.getHits().getHits()) {
             AlarmRecord.Builder builder = new AlarmRecord.Builder();
-            AlarmRecord alarmRecord = builder.storage2Entity(new HashMapConverter.ToEntity(searchHit.getSource()));
+            AlarmRecord alarmRecord = builder.storage2Entity(new ElasticSearchConverter.ToEntity(AlarmRecord.INDEX_NAME, searchHit.getSource()));
 
             AlarmMessage message = new AlarmMessage();
             message.setId(String.valueOf(alarmRecord.getId0()));
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
index edb3046..e22d323 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingDataEsDAO.java
@@ -27,9 +27,9 @@
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
 import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
@@ -91,7 +91,7 @@
         for (SearchHit hit : response.getHits()) {
             final Map<String, Object> sourceAsMap = hit.getSource();
             final EBPFProfilingDataRecord.Builder builder = new EBPFProfilingDataRecord.Builder();
-            records.add(builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap)));
+            records.add(builder.storage2Entity(new ElasticSearchConverter.ToEntity(EBPFProfilingDataRecord.INDEX_NAME, sourceAsMap)));
         }
         return records;
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
index 5d7bd48..82c885e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
@@ -32,11 +32,11 @@
 import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
 import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
 import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
@@ -138,7 +138,7 @@
     private EBPFProfilingTask parseTask(final SearchHit hit) {
         final Map<String, Object> sourceAsMap = hit.getSource();
         final EBPFProfilingTaskRecord.Builder builder = new EBPFProfilingTaskRecord.Builder();
-        final EBPFProfilingTaskRecord record = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+        final EBPFProfilingTaskRecord record = builder.storage2Entity(new ElasticSearchConverter.ToEntity(EBPFProfilingTaskRecord.INDEX_NAME, sourceAsMap));
 
         final EBPFProfilingTask task = new EBPFProfilingTask();
         task.setTaskId(record.getLogicalId());
@@ -161,4 +161,4 @@
         task.setLastUpdateTime(record.getLastUpdateTime());
         return task;
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 7265987..97b419a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -53,10 +53,10 @@
 import org.apache.skywalking.oap.server.core.query.type.Service;
 import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
 import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
@@ -67,6 +67,8 @@
 
     private final int queryMaxSize;
     private final int scrollingBatchSize;
+    private String endpointTrafficNameAlias;
+    private boolean aliasNameInit = false;
 
     public MetadataQueryEsDAO(
         ElasticSearchClient client,
@@ -194,6 +196,7 @@
     @Override
     public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit)
         throws IOException {
+        initColumnName();
         final String index = IndexController.LogicIndicesRegister.getPhysicalTableName(
             EndpointTraffic.INDEX_NAME);
 
@@ -202,7 +205,7 @@
                  .must(Query.term(EndpointTraffic.SERVICE_ID, serviceId));
 
         if (!Strings.isNullOrEmpty(keyword)) {
-            String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointTraffic.NAME);
+            String matchCName = MatchCNameBuilder.INSTANCE.build(endpointTrafficNameAlias);
             query.must(Query.match(matchCName, keyword));
         }
 
@@ -219,11 +222,11 @@
             Map<String, Object> sourceAsMap = searchHit.getSource();
 
             final EndpointTraffic endpointTraffic =
-                new EndpointTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+                new EndpointTraffic.Builder().storage2Entity(new ElasticSearchConverter.ToEntity(EndpointTraffic.INDEX_NAME, sourceAsMap));
 
             Endpoint endpoint = new Endpoint();
             endpoint.setId(endpointTraffic.id());
-            endpoint.setName((String) sourceAsMap.get(EndpointTraffic.NAME));
+            endpoint.setName(endpointTraffic.getName());
             endpoints.add(endpoint);
         }
 
@@ -360,7 +363,7 @@
         for (SearchHit hit : response.getHits()) {
             final Map<String, Object> sourceAsMap = hit.getSource();
             final ServiceTraffic.Builder builder = new ServiceTraffic.Builder();
-            final ServiceTraffic serviceTraffic = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+            final ServiceTraffic serviceTraffic = builder.storage2Entity(new ElasticSearchConverter.ToEntity(ServiceTraffic.INDEX_NAME, sourceAsMap));
             String serviceName = serviceTraffic.getName();
             Service service = new Service();
             service.setId(serviceTraffic.getServiceId());
@@ -379,7 +382,7 @@
             Map<String, Object> sourceAsMap = searchHit.getSource();
 
             final InstanceTraffic instanceTraffic =
-                new InstanceTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+                new InstanceTraffic.Builder().storage2Entity(new ElasticSearchConverter.ToEntity(InstanceTraffic.INDEX_NAME, sourceAsMap));
 
             ServiceInstance serviceInstance = new ServiceInstance();
             serviceInstance.setId(instanceTraffic.id());
@@ -411,7 +414,7 @@
             Map<String, Object> sourceAsMap = searchHit.getSource();
 
             final ProcessTraffic processTraffic =
-                new ProcessTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+                new ProcessTraffic.Builder().storage2Entity(new ElasticSearchConverter.ToEntity(ProcessTraffic.INDEX_NAME, sourceAsMap));
 
             Process process = new Process();
             process.setId(processTraffic.id());
@@ -443,4 +446,14 @@
         }
         return processes;
     }
+
+    /**
+     * When the index column use an alias, we should get it's real physical column name for query.
+     */
+    private void initColumnName() {
+        if (!aliasNameInit) {
+            this.endpointTrafficNameAlias = IndexController.LogicIndicesRegister.getPhysicalColumnName(EndpointTraffic.INDEX_NAME, EndpointTraffic.NAME);
+            aliasNameInit = true;
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 6d194ee..f36cbc8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -36,7 +36,6 @@
 import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
-import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.PointOfTime;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
@@ -64,11 +63,12 @@
     public long readMetricsValue(final MetricsCondition condition,
                                  final String valueColumnName,
                                  final Duration duration) {
+        final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
         final SearchBuilder sourceBuilder = buildQuery(condition, duration);
         int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
         if (function == Function.Latest) {
-            return readMetricsValues(condition, valueColumnName, duration)
+            return readMetricsValues(condition, realValueColumn, duration)
                 .getValues().latestValue(defaultValue);
         }
 
@@ -78,7 +78,7 @@
                        .executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
                        .collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST)
                        .size(1);
-        functionAggregation(function, entityIdAggregation, valueColumnName);
+        functionAggregation(function, entityIdAggregation, realValueColumn);
 
         sourceBuilder.aggregation(entityIdAggregation);
 
@@ -93,7 +93,7 @@
             (List<Map<String, Object>>) idTerms.get("buckets");
 
         for (Map<String, Object> idBucket : buckets) {
-            final Map<String, Object> agg = (Map<String, Object>) idBucket.get(valueColumnName);
+            final Map<String, Object> agg = (Map<String, Object>) idBucket.get(realValueColumn);
             return ((Number) agg.get("value")).longValue();
         }
         return defaultValue;
@@ -103,6 +103,7 @@
     public MetricsValues readMetricsValues(final MetricsCondition condition,
                                            final String valueColumnName,
                                            final Duration duration) {
+        final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
         String tableName =
             IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
         final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
@@ -132,7 +133,7 @@
                 kvInt.setValue(0);
                 if (idMap.containsKey(id)) {
                     Map<String, Object> source = idMap.get(id);
-                    kvInt.setValue(((Number) source.getOrDefault(valueColumnName, 0)).longValue());
+                    kvInt.setValue(((Number) source.getOrDefault(realValueColumn, 0)).longValue());
                 } else {
                     kvInt.setValue(ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()));
                 }
@@ -152,6 +153,7 @@
                                                         final String valueColumnName,
                                                         final List<String> labels,
                                                         final Duration duration) {
+        final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
         final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
         String tableName =
             IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
@@ -177,7 +179,7 @@
             for (final Document document : response.get()) {
                 idMap.put(
                     document.getId(),
-                    new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
+                    new DataTable((String) document.getSource().getOrDefault(realValueColumn, ""))
                 );
             }
         }
@@ -188,6 +190,7 @@
     public HeatMap readHeatMap(final MetricsCondition condition,
                                final String valueColumnName,
                                final Duration duration) {
+        final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
         final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
         String tableName =
             IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
@@ -218,7 +221,7 @@
         for (String id : ids) {
             Map<String, Object> source = idMap.get(id);
             if (source != null) {
-                String value = (String) source.get(HistogramMetrics.DATASET);
+                String value = (String) source.get(realValueColumn);
                 heatMap.buildColumn(id, value, defaultValue);
             }
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileThreadSnapshotQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileThreadSnapshotQueryEsDAO.java
index 27e79bb..40dc449 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileThreadSnapshotQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileThreadSnapshotQueryEsDAO.java
@@ -39,10 +39,10 @@
 import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
 import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
@@ -170,7 +170,7 @@
         List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
         for (SearchHit searchHit : response.getHits().getHits()) {
             ProfileThreadSnapshotRecord record = builder.storage2Entity(
-                new HashMapConverter.ToEntity(searchHit.getSource()));
+                new ElasticSearchConverter.ToEntity(ProfileThreadSnapshotRecord.INDEX_NAME, searchHit.getSource()));
 
             result.add(record);
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TagAutoCompleteQueryDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TagAutoCompleteQueryDAO.java
index c87e1b6..69e36d2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TagAutoCompleteQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TagAutoCompleteQueryDAO.java
@@ -33,9 +33,9 @@
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
 import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
@@ -103,7 +103,7 @@
         Set<String> tagValues = new HashSet<>();
         for (SearchHit searchHit : response.getHits().getHits()) {
             TagAutocompleteData tag = new TagAutocompleteData.Builder().storage2Entity(
-                new HashMapConverter.ToEntity(searchHit.getSource()));
+                new ElasticSearchConverter.ToEntity(TagAutocompleteData.INDEX_NAME, searchHit.getSource()));
             tagValues.add(tag.getTagValue());
         }
         return tagValues;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index 0b15f75..5401c67 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -40,11 +40,11 @@
 import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
 import org.apache.skywalking.oap.server.core.query.type.TraceState;
 import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
@@ -174,7 +174,7 @@
         List<SegmentRecord> segmentRecords = new ArrayList<>();
         for (SearchHit searchHit : response.getHits().getHits()) {
             SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity(
-                new HashMapConverter.ToEntity(searchHit.getSource()));
+                new ElasticSearchConverter.ToEntity(SegmentRecord.INDEX_NAME, searchHit.getSource()));
             segmentRecords.add(segmentRecord);
         }
         return segmentRecords;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java
index f3cae9f..14146e8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/UITemplateManagementEsDAO.java
@@ -35,10 +35,10 @@
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
 import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 
@@ -64,7 +64,7 @@
             UITemplate.Builder builder = new UITemplate.Builder();
             SearchHit data = response.getHits().getHits().get(0);
             return new DashboardConfiguration().fromEntity(
-                builder.storage2Entity(new HashMapConverter.ToEntity(data.getSource())));
+                builder.storage2Entity(new ElasticSearchConverter.ToEntity(UITemplate.INDEX_NAME, data.getSource())));
         }
         return null;
     }
@@ -93,7 +93,7 @@
         for (SearchHit searchHit : response.getHits()) {
             Map<String, Object> sourceAsMap = searchHit.getSource();
 
-            final UITemplate uiTemplate = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
+            final UITemplate uiTemplate = builder.storage2Entity(new ElasticSearchConverter.ToEntity(UITemplate.INDEX_NAME, sourceAsMap));
             configs.add(new DashboardConfiguration().fromEntity(uiTemplate));
         }
         return configs;
@@ -111,7 +111,7 @@
                                            .build();
             }
 
-            final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+            final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(UITemplate.INDEX_NAME);
             builder.entity2Storage(uiTemplate, toStorage);
             getClient().forceInsert(UITemplate.INDEX_NAME, uiTemplate.id(), toStorage.obtain());
             return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
@@ -134,7 +134,7 @@
                                            .message("Can't find the template").build();
             }
 
-            final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+            final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(UITemplate.INDEX_NAME);
             builder.entity2Storage(uiTemplate, toStorage);
             getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), toStorage.obtain());
             return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
@@ -151,10 +151,10 @@
         if (response.isPresent()) {
             final UITemplate.Builder builder = new UITemplate.Builder();
             final UITemplate uiTemplate = builder.storage2Entity(
-                new HashMapConverter.ToEntity(response.get().getSource()));
+                new ElasticSearchConverter.ToEntity(UITemplate.INDEX_NAME, response.get().getSource()));
             uiTemplate.setDisabled(BooleanUtils.TRUE);
 
-            final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
+            final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(UITemplate.INDEX_NAME);
             builder.entity2Storage(uiTemplate, toStorage);
             getClient().forceUpdate(UITemplate.INDEX_NAME, uiTemplate.id(), toStorage.obtain());
             return TemplateChangeStatus.builder().status(true).id(id).build();
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
index 8a129da..0b6545c 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/zipkin/ZipkinQueryEsDAO.java
@@ -38,7 +38,6 @@
 import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceRelationTraffic;
 import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceSpanTraffic;
 import org.apache.skywalking.oap.server.core.zipkin.ZipkinServiceTraffic;
@@ -46,6 +45,7 @@
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
@@ -82,7 +82,7 @@
                 for (SearchHit searchHit : response.getHits()) {
                     Map<String, Object> sourceAsMap = searchHit.getSource();
                     ZipkinServiceTraffic record = new ZipkinServiceTraffic.Builder().storage2Entity(
-                        new HashMapConverter.ToEntity(sourceAsMap));
+                        new ElasticSearchConverter.ToEntity(ZipkinServiceTraffic.INDEX_NAME, sourceAsMap));
                     services.add(record.getServiceName());
                 }
                 if (services.size() < SCROLLING_BATCH_SIZE) {
@@ -110,7 +110,7 @@
         for (SearchHit searchHit : response.getHits()) {
             Map<String, Object> sourceAsMap = searchHit.getSource();
             ZipkinServiceRelationTraffic record = new ZipkinServiceRelationTraffic.Builder().storage2Entity(
-                new HashMapConverter.ToEntity(sourceAsMap));
+                new ElasticSearchConverter.ToEntity(ZipkinServiceRelationTraffic.INDEX_NAME, sourceAsMap));
             remoteServices.add(record.getRemoteServiceName());
         }
         return remoteServices;
@@ -129,7 +129,7 @@
         for (SearchHit searchHit : response.getHits()) {
             Map<String, Object> sourceAsMap = searchHit.getSource();
             ZipkinServiceSpanTraffic record = new ZipkinServiceSpanTraffic.Builder().storage2Entity(
-                new HashMapConverter.ToEntity(sourceAsMap));
+                new ElasticSearchConverter.ToEntity(ZipkinServiceSpanTraffic.INDEX_NAME, sourceAsMap));
             spanNames.add(record.getSpanName());
         }
         return spanNames;
@@ -151,7 +151,7 @@
                 for (SearchHit searchHit : response.getHits()) {
                     Map<String, Object> sourceAsMap = searchHit.getSource();
                     ZipkinSpanRecord record = new ZipkinSpanRecord.Builder().storage2Entity(
-                        new HashMapConverter.ToEntity(sourceAsMap));
+                        new ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
                     trace.add(buildSpanFromRecord(record));
                 }
                 if (response.getHits().getHits().size() < SCROLLING_BATCH_SIZE) {
@@ -261,7 +261,7 @@
         for (SearchHit searchHit : response.getHits()) {
             Map<String, Object> sourceAsMap = searchHit.getSource();
             ZipkinSpanRecord record = new ZipkinSpanRecord.Builder().storage2Entity(
-                new HashMapConverter.ToEntity(sourceAsMap));
+                new ElasticSearchConverter.ToEntity(ZipkinSpanRecord.INDEX_NAME, sourceAsMap));
             Span span = buildSpanFromRecord(record);
             String traceId = span.traceId();
             groupedByTraceId.putIfAbsent(traceId, new ArrayList<>());