Fix JDBC TTL to delete additional tables data. (#9508)

diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index f6a547b..ac642f1 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -564,7 +564,7 @@
         run: |
           echo "${{ matrix.test.env }}"  >> $GITHUB_ENV
       - name: ${{ matrix.test.name }}
-        uses: apache/skywalking-infra-e2e@main
+        uses: apache/skywalking-infra-e2e@d71bbba95f95d6629db199e4bc0c0e097ae4f2dc
         with:
           e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
       - uses: actions/upload-artifact@v2
@@ -603,7 +603,7 @@
         with:
           go-version: '1.16'
       - name: ${{ matrix.test.name }}
-        uses: apache/skywalking-infra-e2e@main
+        uses: apache/skywalking-infra-e2e@d71bbba95f95d6629db199e4bc0c0e097ae4f2dc
         env:
           ISTIO_VERSION: ${{ matrix.istio_version }}
           ALS_ANALYZER: ${{ matrix.analyzer }}
@@ -645,7 +645,7 @@
         with:
           go-version: '1.16'
       - name: Java version ${{ matrix.java-version }}
-        uses: apache/skywalking-infra-e2e@main
+        uses: apache/skywalking-infra-e2e@d71bbba95f95d6629db199e4bc0c0e097ae4f2dc
         env:
           SW_AGENT_JDK_VERSION: ${{ matrix.java-version }}
         with:
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 39251fb..d9e88d5 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -59,6 +59,9 @@
 * [Breaking Change] rename configuration folder from `otel-oc-rules` to `otel-rules`.
 * [Breaking Change] rename configuration field from `enabledOcRules` to `enabledOtelRules` and
   environment variable name from `SW_OTEL_RECEIVER_ENABLED_OC_RULES` to `SW_OTEL_RECEIVER_ENABLED_OTEL_RULES`.
+* [Breaking Change] Fix JDBC TTL to delete additional tables data. 
+  SQL Database requires removing `segment`,`segment_tag`, `logs`, `logs_tag`, `alarms`, `alarms_tag`, `zipkin_span`, `zipkin_query` before OAP starts.
+* SQL Database: add `@SQLDatabase.ExtraColumn4AdditionalEntity` to support add an extra column from parent to an additional table.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index f1d4d0e..178aaa5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -34,13 +34,14 @@
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
 import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
 
 @Getter
 @Setter
 @ScopeDeclaration(id = ALARM, name = "Alarm")
 @Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AlarmRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
 public class AlarmRecord extends Record {
 
     public static final String INDEX_NAME = "alarm_record";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
index 7817647..33d51bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
@@ -23,12 +23,15 @@
 import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
 import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
 
 @SuperDataset
 @Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AbstractLogRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
 public class LogRecord extends AbstractLogRecord {
 
     public static final String INDEX_NAME = "log";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index b55921e..a5c0bc0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -32,9 +32,11 @@
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
 
 @SuperDataset
 @Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = SegmentRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
 public class SegmentRecord extends Record {
 
     public static final String INDEX_NAME = "segment";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java
index 8358711..4b081d3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java
@@ -113,4 +113,26 @@
         String[] additionalTables();
         boolean reserveOriginalColumns() default false;
     }
+
+    /**
+     * Support add an extra column from the parent classes as a column of the additional table.
+     * This column would be created in both the primary and additional tables.
+     * Notice: This annotation should be declared on the leaf subclasses.
+     */
+    @Target({ElementType.TYPE})
+    @Retention(RetentionPolicy.RUNTIME)
+    @Repeatable(MultipleExtraColumn4AdditionalEntity.class)
+    @interface ExtraColumn4AdditionalEntity {
+        String additionalTable();
+        String parentColumn();
+    }
+
+    /**
+     * The support of the multiple {@link ExtraColumn4AdditionalEntity}s on the class.
+     */
+    @Target({ElementType.TYPE})
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface MultipleExtraColumn4AdditionalEntity {
+        ExtraColumn4AdditionalEntity[] value();
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 2d85c2e..5457648 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
@@ -59,6 +60,32 @@
         ShardingKeyChecker checker = new ShardingKeyChecker();
         SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
         retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
+        // Add extra column for additional entities
+        if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
+            || aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
+            Map<String/*parent column*/, List<String>/*tables*/> extraColumns = new HashMap<>();
+            if (aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
+                for (SQLDatabase.ExtraColumn4AdditionalEntity extraColumn : aClass.getAnnotation(
+                    SQLDatabase.MultipleExtraColumn4AdditionalEntity.class).value()) {
+                    List<String> tables = extraColumns.computeIfAbsent(
+                        extraColumn.parentColumn(), v -> new ArrayList<>());
+                    tables.add(extraColumn.additionalTable());
+                }
+            } else {
+                SQLDatabase.ExtraColumn4AdditionalEntity extraColumn = aClass.getAnnotation(
+                    SQLDatabase.ExtraColumn4AdditionalEntity.class);
+                List<String> tables = extraColumns.computeIfAbsent(extraColumn.parentColumn(), v -> new ArrayList<>());
+                tables.add(extraColumn.additionalTable());
+            }
+
+            extraColumns.forEach((extraColumn, tables) -> {
+                if (!addExtraColumn4AdditionalEntity(sqlDBModelExtension, modelColumns, extraColumn, tables)) {
+                    throw new IllegalStateException(
+                        "Model [" + storage.getModelName() + "] defined an extra column  [" + extraColumn + "]  by @SQLDatabase.ExtraColumn4AdditionalEntity, " +
+                            "but couldn't be found from the parent.");
+                }
+            });
+        }
         checker.check(storage.getModelName());
 
         Model model = new Model(
@@ -246,6 +273,20 @@
         });
     }
 
+    private boolean addExtraColumn4AdditionalEntity(SQLDatabaseModelExtension sqlDBModelExtension,
+                                                    List<ModelColumn> modelColumns,
+                                                    String extraColumn, List<String> additionalTables) {
+        for (ModelColumn modelColumn : modelColumns) {
+            if (modelColumn.getColumnName().getName().equals(extraColumn)) {
+                additionalTables.forEach(tableName -> {
+                    sqlDBModelExtension.appendAdditionalTable(tableName, modelColumn);
+                });
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     public List<Model> allModels() {
         return models;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
index 5bc7287..1a2983c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
@@ -37,9 +37,11 @@
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
 
 @SuperDataset
 @Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE, parentColumn = TIME_BUCKET)
 public class ZipkinSpanRecord extends Record {
     private static final Gson GSON = new Gson();
     public static final String INDEX_NAME = "zipkin_span";
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
index 9344ba2..a28820d 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
@@ -86,9 +86,9 @@
             parameters.add(scopeId.intValue());
         }
         if (startTB != 0 && endTB != 0) {
-            sql.append(" and ").append(AlarmRecord.TIME_BUCKET).append(" >= ?");
+            sql.append(" and ").append(AlarmRecord.INDEX_NAME).append(".").append(AlarmRecord.TIME_BUCKET).append(" >= ?");
             parameters.add(startTB);
-            sql.append(" and ").append(AlarmRecord.TIME_BUCKET).append(" <= ?");
+            sql.append(" and ").append(AlarmRecord.INDEX_NAME).append(".").append(AlarmRecord.TIME_BUCKET).append(" <= ?");
             parameters.add(endTB);
         }
 
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
index bac373d..7413b8e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
@@ -23,6 +23,7 @@
 import java.sql.SQLException;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
@@ -68,6 +69,16 @@
                 }
             }
             client.executeUpdate(connection, dataDeleteSQL.toString(), deadline, minTime);
+            //delete additional tables
+            for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
+                                                                                  .getAdditionalTables()
+                                                                                  .values()) {
+                SQLBuilder additionalTableDeleteSQL = new SQLBuilder("delete from " + additionalTable.getName() + " where ")
+                    .append(timeBucketColumnName).append("<= ? ")
+                    .append(" and ")
+                    .append(timeBucketColumnName).append(">= ? ");
+                client.executeUpdate(connection, additionalTableDeleteSQL.toString(), deadline, minTime);
+            }
         } catch (JDBCClientException | SQLException e) {
             throw new IOException(e.getMessage(), e);
         }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java
index 09a5c93..e3b10d7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java
@@ -105,9 +105,9 @@
         sql.append(" where ");
         sql.append(" 1=1 ");
         if (startSecondTB != 0 && endSecondTB != 0) {
-            sql.append(" and ").append(AbstractLogRecord.TIME_BUCKET).append(" >= ?");
+            sql.append(" and ").append(LogRecord.INDEX_NAME).append(".").append(AbstractLogRecord.TIME_BUCKET).append(" >= ?");
             parameters.add(startSecondTB);
-            sql.append(" and ").append(AbstractLogRecord.TIME_BUCKET).append(" <= ?");
+            sql.append(" and ").append(LogRecord.INDEX_NAME).append(".").append(AbstractLogRecord.TIME_BUCKET).append(" <= ?");
             parameters.add(endSecondTB);
         }
 
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index edf81b3..8a93a5e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -99,9 +99,9 @@
         sql.append(" where ");
         sql.append(" 1=1 ");
         if (startSecondTB != 0 && endSecondTB != 0) {
-            sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
+            sql.append(" and ").append(SegmentRecord.INDEX_NAME).append(".").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
             parameters.add(startSecondTB);
-            sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
+            sql.append(" and ").append(SegmentRecord.INDEX_NAME).append(".").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
             parameters.add(endSecondTB);
         }
         if (minDuration != 0) {
diff --git a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java
index 5c0ea26..d876fbb 100644
--- a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java
@@ -23,6 +23,7 @@
 import java.sql.SQLException;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
@@ -70,6 +71,18 @@
             }
             while (client.executeUpdate(connection, dataDeleteSQL.toString(), deadline, minTime) > 0) {
             }
+            //delete additional tables
+            for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
+                                                                                  .getAdditionalTables()
+                                                                                  .values()) {
+                SQLBuilder additionalTableDeleteSQL = new SQLBuilder("delete from " + additionalTable.getName() + " where ")
+                    .append(timeBucketColumnName).append("<= ? ")
+                    .append(" and ")
+                    .append(timeBucketColumnName).append(">= ? ")
+                    .append(" limit 10000");
+                while (client.executeUpdate(connection, additionalTableDeleteSQL.toString(), deadline, minTime) > 0) {
+                }
+            }
         } catch (JDBCClientException | SQLException e) {
             throw new IOException(e.getMessage(), e);
         }