Fix JDBC TTL to delete additional tables data. (#9508)
diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index f6a547b..ac642f1 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -564,7 +564,7 @@
run: |
echo "${{ matrix.test.env }}" >> $GITHUB_ENV
- name: ${{ matrix.test.name }}
- uses: apache/skywalking-infra-e2e@main
+ uses: apache/skywalking-infra-e2e@d71bbba95f95d6629db199e4bc0c0e097ae4f2dc
with:
e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
- uses: actions/upload-artifact@v2
@@ -603,7 +603,7 @@
with:
go-version: '1.16'
- name: ${{ matrix.test.name }}
- uses: apache/skywalking-infra-e2e@main
+ uses: apache/skywalking-infra-e2e@d71bbba95f95d6629db199e4bc0c0e097ae4f2dc
env:
ISTIO_VERSION: ${{ matrix.istio_version }}
ALS_ANALYZER: ${{ matrix.analyzer }}
@@ -645,7 +645,7 @@
with:
go-version: '1.16'
- name: Java version ${{ matrix.java-version }}
- uses: apache/skywalking-infra-e2e@main
+ uses: apache/skywalking-infra-e2e@d71bbba95f95d6629db199e4bc0c0e097ae4f2dc
env:
SW_AGENT_JDK_VERSION: ${{ matrix.java-version }}
with:
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 39251fb..d9e88d5 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -59,6 +59,9 @@
* [Breaking Change] rename configuration folder from `otel-oc-rules` to `otel-rules`.
* [Breaking Change] rename configuration field from `enabledOcRules` to `enabledOtelRules` and
environment variable name from `SW_OTEL_RECEIVER_ENABLED_OC_RULES` to `SW_OTEL_RECEIVER_ENABLED_OTEL_RULES`.
+* [Breaking Change] Fix JDBC TTL to delete additional tables data.
+ SQL Database requires removing `segment`,`segment_tag`, `logs`, `logs_tag`, `alarms`, `alarms_tag`, `zipkin_span`, `zipkin_query` before OAP starts.
+* SQL Database: add `@SQLDatabase.ExtraColumn4AdditionalEntity` to support add an extra column from parent to an additional table.
#### UI
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index f1d4d0e..178aaa5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -34,13 +34,14 @@
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
@Getter
@Setter
@ScopeDeclaration(id = ALARM, name = "Alarm")
@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AlarmRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
public class AlarmRecord extends Record {
public static final String INDEX_NAME = "alarm_record";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
index 7817647..33d51bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
@@ -23,12 +23,15 @@
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
@SuperDataset
@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AbstractLogRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
public class LogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "log";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index b55921e..a5c0bc0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -32,9 +32,11 @@
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
@SuperDataset
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = SegmentRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
public class SegmentRecord extends Record {
public static final String INDEX_NAME = "segment";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java
index 8358711..4b081d3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/SQLDatabase.java
@@ -113,4 +113,26 @@
String[] additionalTables();
boolean reserveOriginalColumns() default false;
}
+
+ /**
+ * Support add an extra column from the parent classes as a column of the additional table.
+ * This column would be created in both the primary and additional tables.
+ * Notice: This annotation should be declared on the leaf subclasses.
+ */
+ @Target({ElementType.TYPE})
+ @Retention(RetentionPolicy.RUNTIME)
+ @Repeatable(MultipleExtraColumn4AdditionalEntity.class)
+ @interface ExtraColumn4AdditionalEntity {
+ String additionalTable();
+ String parentColumn();
+ }
+
+ /**
+ * The support of the multiple {@link ExtraColumn4AdditionalEntity}s on the class.
+ */
+ @Target({ElementType.TYPE})
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface MultipleExtraColumn4AdditionalEntity {
+ ExtraColumn4AdditionalEntity[] value();
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 2d85c2e..5457648 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
@@ -59,6 +60,32 @@
ShardingKeyChecker checker = new ShardingKeyChecker();
SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
+ // Add extra column for additional entities
+ if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
+ || aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
+ Map<String/*parent column*/, List<String>/*tables*/> extraColumns = new HashMap<>();
+ if (aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
+ for (SQLDatabase.ExtraColumn4AdditionalEntity extraColumn : aClass.getAnnotation(
+ SQLDatabase.MultipleExtraColumn4AdditionalEntity.class).value()) {
+ List<String> tables = extraColumns.computeIfAbsent(
+ extraColumn.parentColumn(), v -> new ArrayList<>());
+ tables.add(extraColumn.additionalTable());
+ }
+ } else {
+ SQLDatabase.ExtraColumn4AdditionalEntity extraColumn = aClass.getAnnotation(
+ SQLDatabase.ExtraColumn4AdditionalEntity.class);
+ List<String> tables = extraColumns.computeIfAbsent(extraColumn.parentColumn(), v -> new ArrayList<>());
+ tables.add(extraColumn.additionalTable());
+ }
+
+ extraColumns.forEach((extraColumn, tables) -> {
+ if (!addExtraColumn4AdditionalEntity(sqlDBModelExtension, modelColumns, extraColumn, tables)) {
+ throw new IllegalStateException(
+ "Model [" + storage.getModelName() + "] defined an extra column [" + extraColumn + "] by @SQLDatabase.ExtraColumn4AdditionalEntity, " +
+ "but couldn't be found from the parent.");
+ }
+ });
+ }
checker.check(storage.getModelName());
Model model = new Model(
@@ -246,6 +273,20 @@
});
}
+ private boolean addExtraColumn4AdditionalEntity(SQLDatabaseModelExtension sqlDBModelExtension,
+ List<ModelColumn> modelColumns,
+ String extraColumn, List<String> additionalTables) {
+ for (ModelColumn modelColumn : modelColumns) {
+ if (modelColumn.getColumnName().getName().equals(extraColumn)) {
+ additionalTables.forEach(tableName -> {
+ sqlDBModelExtension.appendAdditionalTable(tableName, modelColumn);
+ });
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public List<Model> allModels() {
return models;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
index 5bc7287..1a2983c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
@@ -37,9 +37,11 @@
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
+import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
@SuperDataset
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
+@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE, parentColumn = TIME_BUCKET)
public class ZipkinSpanRecord extends Record {
private static final Gson GSON = new Gson();
public static final String INDEX_NAME = "zipkin_span";
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
index 9344ba2..a28820d 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
@@ -86,9 +86,9 @@
parameters.add(scopeId.intValue());
}
if (startTB != 0 && endTB != 0) {
- sql.append(" and ").append(AlarmRecord.TIME_BUCKET).append(" >= ?");
+ sql.append(" and ").append(AlarmRecord.INDEX_NAME).append(".").append(AlarmRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startTB);
- sql.append(" and ").append(AlarmRecord.TIME_BUCKET).append(" <= ?");
+ sql.append(" and ").append(AlarmRecord.INDEX_NAME).append(".").append(AlarmRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endTB);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
index bac373d..7413b8e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
@@ -23,6 +23,7 @@
import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
@@ -68,6 +69,16 @@
}
}
client.executeUpdate(connection, dataDeleteSQL.toString(), deadline, minTime);
+ //delete additional tables
+ for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
+ .getAdditionalTables()
+ .values()) {
+ SQLBuilder additionalTableDeleteSQL = new SQLBuilder("delete from " + additionalTable.getName() + " where ")
+ .append(timeBucketColumnName).append("<= ? ")
+ .append(" and ")
+ .append(timeBucketColumnName).append(">= ? ");
+ client.executeUpdate(connection, additionalTableDeleteSQL.toString(), deadline, minTime);
+ }
} catch (JDBCClientException | SQLException e) {
throw new IOException(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java
index 09a5c93..e3b10d7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2LogQueryDAO.java
@@ -105,9 +105,9 @@
sql.append(" where ");
sql.append(" 1=1 ");
if (startSecondTB != 0 && endSecondTB != 0) {
- sql.append(" and ").append(AbstractLogRecord.TIME_BUCKET).append(" >= ?");
+ sql.append(" and ").append(LogRecord.INDEX_NAME).append(".").append(AbstractLogRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
- sql.append(" and ").append(AbstractLogRecord.TIME_BUCKET).append(" <= ?");
+ sql.append(" and ").append(LogRecord.INDEX_NAME).append(".").append(AbstractLogRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index edf81b3..8a93a5e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -99,9 +99,9 @@
sql.append(" where ");
sql.append(" 1=1 ");
if (startSecondTB != 0 && endSecondTB != 0) {
- sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
+ sql.append(" and ").append(SegmentRecord.INDEX_NAME).append(".").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
- sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
+ sql.append(" and ").append(SegmentRecord.INDEX_NAME).append(".").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
}
if (minDuration != 0) {
diff --git a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java
index 5c0ea26..d876fbb 100644
--- a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBHistoryDeleteDAO.java
@@ -23,6 +23,7 @@
import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
@@ -70,6 +71,18 @@
}
while (client.executeUpdate(connection, dataDeleteSQL.toString(), deadline, minTime) > 0) {
}
+ //delete additional tables
+ for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
+ .getAdditionalTables()
+ .values()) {
+ SQLBuilder additionalTableDeleteSQL = new SQLBuilder("delete from " + additionalTable.getName() + " where ")
+ .append(timeBucketColumnName).append("<= ? ")
+ .append(" and ")
+ .append(timeBucketColumnName).append(">= ? ")
+ .append(" limit 10000");
+ while (client.executeUpdate(connection, additionalTableDeleteSQL.toString(), deadline, minTime) > 0) {
+ }
+ }
} catch (JDBCClientException | SQLException e) {
throw new IOException(e.getMessage(), e);
}