Add feature to automatically remove audit logs based on retention period (#11084)
* add docs
* add impl
* fix checkstyle
* fix test
* add test
* fix checkstyle
* fix checkstyle
* fix test
* Address comments
* Address comments
* fix spelling
* fix docs
diff --git a/core/src/main/java/org/apache/druid/audit/AuditManager.java b/core/src/main/java/org/apache/druid/audit/AuditManager.java
index 73804d7..f403423 100644
--- a/core/src/main/java/org/apache/druid/audit/AuditManager.java
+++ b/core/src/main/java/org/apache/druid/audit/AuditManager.java
@@ -91,4 +91,12 @@
* @return list of AuditEntries satisfying the passed parameters
*/
List<AuditEntry> fetchAuditHistory(String type, int limit);
+
+ /**
+ * Remove audit logs created older than the given timestamp.
+ *
+ * @param timestamp timestamp in milliseconds
+ * @return number of audit logs removed
+ */
+ int removeAuditLogsOlderThan(long timestamp);
}
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index cd26a9e..d49bf75 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -742,6 +742,15 @@
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator process should act like an Overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone Overlord processes. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord processes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
+##### Metadata Management
+
+|Property|Description|Required?|Default|
+|--------|-----------|---------|-------|
+|`druid.coordinator.period.metadataStoreManagementPeriod`|How often to run metadata management tasks in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. |No | `PT1H`|
+|`druid.coordinator.kill.audit.on`| Boolean value for whether to enable automatic deletion of audit logs. If set to true, Coordinator will periodically remove audit logs from the audit table entries in metadata storage.| No | False|
+|`druid.coordinator.kill.audit.period`| How often to do automatic deletion of audit logs in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.audit.on` is set to True.| No| `P1D`|
+|`druid.coordinator.kill.audit.durationToRetain`| Duration of audit logs to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.audit.on` is set to True.| Yes if `druid.coordinator.kill.audit.on` is set to True| None|
+
##### Segment Management
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 9a1d2c7..5a2d5fb 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -256,6 +256,8 @@
|`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
|`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
+|`metadata/kill/audit/count`|Total number of audit logs automatically deleted from metadata store audit table per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on if more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.|
+
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class
`org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have extra information on balancing
diff --git a/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
new file mode 100644
index 0000000..47cde2b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ */
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CoordinatorMetadataStoreManagementDuty
+{
+}
diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
index 1302043..91a5a21 100644
--- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
+++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
@@ -41,6 +41,7 @@
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
@@ -229,6 +230,24 @@
return fetchAuditHistoryLastEntries(null, type, limit);
}
+ @Override
+ public int removeAuditLogsOlderThan(final long timestamp)
+ {
+ DateTime dateTime = DateTimes.utc(timestamp);
+ return dbi.withHandle(
+ handle -> {
+ Update sql = handle.createStatement(
+ StringUtils.format(
+ "DELETE FROM %s WHERE created_date < :date_time",
+ getAuditTable()
+ )
+ );
+ return sql.bind("date_time", dateTime.toString())
+ .execute();
+ }
+ );
+ }
+
private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, final String type, int limit)
throws IllegalArgumentException
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 17a3794..7da4fe6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -47,6 +47,7 @@
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
+import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@@ -150,6 +151,7 @@
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
private final Set<CoordinatorDuty> indexingServiceDuties;
+ private final Set<CoordinatorDuty> metadataStoreManagementDuties;
private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;
@@ -164,6 +166,7 @@
private ListeningExecutorService balancerExec;
private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
+ private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
@@ -182,6 +185,7 @@
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self,
+ @CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> metadataStoreManagementDuties,
@CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
@@ -206,6 +210,7 @@
self,
new ConcurrentHashMap<>(),
indexingServiceDuties,
+ metadataStoreManagementDuties,
factory,
lookupCoordinatorManager,
coordLeaderSelector,
@@ -230,6 +235,7 @@
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<CoordinatorDuty> indexingServiceDuties,
+ Set<CoordinatorDuty> metadataStoreManagementDuties,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector,
@@ -255,6 +261,7 @@
this.serviceAnnouncer = serviceAnnouncer;
this.self = self;
this.indexingServiceDuties = indexingServiceDuties;
+ this.metadataStoreManagementDuties = metadataStoreManagementDuties;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@@ -665,6 +672,12 @@
)
);
}
+ dutiesRunnables.add(
+ Pair.of(
+ new DutiesRunnable(makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+ config.getCoordinatorMetadataStoreManagementPeriod()
+ )
+ );
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
// CompactSegmentsDuty can takes a non trival amount of time to complete.
@@ -750,6 +763,19 @@
return ImmutableList.copyOf(duties);
}
+ private List<CoordinatorDuty> makeMetadataStoreManagementDuties()
+ {
+ List<CoordinatorDuty> duties = ImmutableList.<CoordinatorDuty>builder()
+ .addAll(metadataStoreManagementDuties)
+ .build();
+
+ log.debug(
+ "Done making metadata store management duties %s",
+ duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
+ );
+ return ImmutableList.copyOf(duties);
+ }
+
private List<CoordinatorDuty> makeCompactSegmentsDuty()
{
return ImmutableList.of(compactSegments);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 130d7e8..933b974 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -39,6 +39,10 @@
@Default("PT1800s")
public abstract Duration getCoordinatorIndexingPeriod();
+ @Config("druid.coordinator.period.metadataStoreManagementPeriod")
+ @Default("PT1H")
+ public abstract Duration getCoordinatorMetadataStoreManagementPeriod();
+
@Config("druid.coordinator.kill.period")
@Default("P1D")
public abstract Duration getCoordinatorKillPeriod();
@@ -51,6 +55,14 @@
@Default("0")
public abstract int getCoordinatorKillMaxSegments();
+ @Config("druid.coordinator.kill.audit.period")
+ @Default("P1D")
+ public abstract Duration getCoordinatorAuditKillPeriod();
+
+ @Config("druid.coordinator.kill.audit.durationToRetain")
+ @Default("PT-1s")
+ public abstract Duration getCoordinatorAuditKillDurationToRetain();
+
@Config("druid.coordinator.load.timeout")
public Duration getLoadTimeoutDelay()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
new file mode 100644
index 0000000..651cf6b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+
+public class KillAuditLog implements CoordinatorDuty
+{
+ private static final Logger log = new Logger(KillAuditLog.class);
+
+ private final long period;
+ private final long retainDuration;
+ private long lastKillTime = 0;
+
+ private final AuditManager auditManager;
+
+ @Inject
+ public KillAuditLog(
+ AuditManager auditManager,
+ DruidCoordinatorConfig config
+ )
+ {
+ this.period = config.getCoordinatorAuditKillPeriod().getMillis();
+ Preconditions.checkArgument(
+ this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
+ "coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
+ );
+ this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis();
+ Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0");
+ log.debug(
+ "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]",
+ this.period,
+ this.retainDuration
+ );
+ this.auditManager = auditManager;
+ }
+
+ @Override
+ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
+ {
+ if ((lastKillTime + period) < System.currentTimeMillis()) {
+ lastKillTime = System.currentTimeMillis();
+
+ long timestamp = System.currentTimeMillis() - retainDuration;
+ int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
+ ServiceEmitter emitter = params.getEmitter();
+ emitter.emit(
+ new ServiceMetricEvent.Builder().build(
+ "metadata/kill/audit/count",
+ auditRemoved
+ )
+ );
+ log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
+ }
+ return params;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index 429402e..d336e84 100644
--- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -128,15 +128,15 @@
public void testAuditMetricEventBuilderConfig()
{
AuditEntry entry = new AuditEntry(
- "testKey",
- "testType",
- new AuditInfo(
- "testAuthor",
- "testComment",
- "127.0.0.1"
- ),
- "testPayload",
- DateTimes.of("2013-01-01T00:00:00Z")
+ "testKey",
+ "testType",
+ new AuditInfo(
+ "testAuthor",
+ "testComment",
+ "127.0.0.1"
+ ),
+ "testPayload",
+ DateTimes.of("2013-01-01T00:00:00Z")
);
SQLAuditManager auditManagerWithPayloadAsDimension = new SQLAuditManager(
@@ -257,6 +257,83 @@
}
@Test(timeout = 60_000L)
+ public void testRemoveAuditLogsOlderThanWithEntryOlderThanTime() throws IOException
+ {
+ String entry1Key = "testKey";
+ String entry1Type = "testType";
+ AuditInfo entry1AuditInfo = new AuditInfo(
+ "testAuthor",
+ "testComment",
+ "127.0.0.1"
+ );
+ String entry1Payload = "testPayload";
+
+ auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+ byte[] payload = connector.lookup(
+ derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+ "audit_key",
+ "payload",
+ "testKey"
+ );
+ AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+ Assert.assertEquals(entry1Key, dbEntry.getKey());
+ Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+ Assert.assertEquals(entry1Type, dbEntry.getType());
+ Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+
+ // Do delete
+ auditManager.removeAuditLogsOlderThan(System.currentTimeMillis());
+ // Verify the delete
+ payload = connector.lookup(
+ derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+ "audit_key",
+ "payload",
+ "testKey"
+ );
+ Assert.assertNull(payload);
+ }
+
+ @Test(timeout = 60_000L)
+ public void testRemoveAuditLogsOlderThanWithEntryNotOlderThanTime() throws IOException
+ {
+ String entry1Key = "testKey";
+ String entry1Type = "testType";
+ AuditInfo entry1AuditInfo = new AuditInfo(
+ "testAuthor",
+ "testComment",
+ "127.0.0.1"
+ );
+ String entry1Payload = "testPayload";
+
+ auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+ byte[] payload = connector.lookup(
+ derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+ "audit_key",
+ "payload",
+ "testKey"
+ );
+ AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+ Assert.assertEquals(entry1Key, dbEntry.getKey());
+ Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+ Assert.assertEquals(entry1Type, dbEntry.getType());
+ Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+ // Do delete
+ auditManager.removeAuditLogsOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis());
+ // Verify that entry was not delete
+ payload = connector.lookup(
+ derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+ "audit_key",
+ "payload",
+ "testKey"
+ );
+ dbEntry = mapper.readValue(payload, AuditEntry.class);
+ Assert.assertEquals(entry1Key, dbEntry.getKey());
+ Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+ Assert.assertEquals(entry1Type, dbEntry.getType());
+ Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+ }
+
+ @Test(timeout = 60_000L)
public void testFetchAuditHistoryByTypeWithLimit()
{
String entry1Key = "testKey";
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index dd382e3..a8deb66 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -169,8 +169,11 @@
new Duration(COORDINATOR_PERIOD),
null,
null,
+ null,
new Duration(COORDINATOR_PERIOD),
null,
+ null,
+ null,
10,
new Duration("PT0s")
);
@@ -247,6 +250,7 @@
druidNode,
loadManagementPeons,
null,
+ null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
@@ -546,6 +550,7 @@
druidNode,
loadManagementPeons,
null,
+ null,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 36f1bf6..3aaef64 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -141,8 +141,11 @@
new Duration(COORDINATOR_PERIOD),
null,
null,
+ null,
new Duration(COORDINATOR_PERIOD),
null,
+ null,
+ null,
10,
new Duration("PT0s")
);
@@ -212,6 +215,7 @@
druidNode,
loadManagementPeons,
null,
+ new HashSet<>(),
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
@@ -738,6 +742,7 @@
null,
null,
null,
+ null,
ZkEnablementConfig.ENABLED
);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 10d7ba2..7b07d7f 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -80,6 +80,9 @@
null,
null,
null,
+ null,
+ null,
+ null,
10,
Duration.ZERO
)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 4a2a25f..24e6806 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -95,6 +95,9 @@
null,
null,
null,
+ null,
+ null,
+ null,
10,
Duration.millis(0)
)
@@ -287,9 +290,12 @@
null,
null,
null,
+ null,
new Duration(1),
null,
null,
+ null,
+ null,
10,
new Duration("PT1s")
)
@@ -339,9 +345,12 @@
null,
null,
null,
+ null,
new Duration(1),
null,
null,
+ null,
+ null,
10,
new Duration("PT1s")
)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index fb92873..5185452 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -41,9 +41,12 @@
null,
null,
null,
+ null,
new Duration(1),
null,
null,
+ null,
+ null,
10,
new Duration("PT1s")
)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index fef244a..135f8d0 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -27,9 +27,12 @@
private final Duration coordinatorStartDelay;
private final Duration coordinatorPeriod;
private final Duration coordinatorIndexingPeriod;
+ private final Duration metadataStoreManagementPeriod;
private final Duration loadTimeoutDelay;
private final Duration coordinatorKillPeriod;
private final Duration coordinatorKillDurationToRetain;
+ private final Duration coordinatorAuditKillPeriod;
+ private final Duration coordinatorAuditKillDurationToRetain;
private final Duration getLoadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments;
@@ -37,9 +40,12 @@
Duration coordinatorStartDelay,
Duration coordinatorPeriod,
Duration coordinatorIndexingPeriod,
+ Duration metadataStoreManagementPeriod,
Duration loadTimeoutDelay,
Duration coordinatorKillPeriod,
Duration coordinatorKillDurationToRetain,
+ Duration coordinatorAuditKillPeriod,
+ Duration coordinatorAuditKillDurationToRetain,
int coordinatorKillMaxSegments,
Duration getLoadQueuePeonRepeatDelay
)
@@ -47,9 +53,12 @@
this.coordinatorStartDelay = coordinatorStartDelay;
this.coordinatorPeriod = coordinatorPeriod;
this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
+ this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
this.loadTimeoutDelay = loadTimeoutDelay;
this.coordinatorKillPeriod = coordinatorKillPeriod;
this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
+ this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
+ this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
}
@@ -73,6 +82,12 @@
}
@Override
+ public Duration getCoordinatorMetadataStoreManagementPeriod()
+ {
+ return metadataStoreManagementPeriod;
+ }
+
+ @Override
public Duration getCoordinatorKillPeriod()
{
return coordinatorKillPeriod;
@@ -85,6 +100,18 @@
}
@Override
+ public Duration getCoordinatorAuditKillPeriod()
+ {
+ return coordinatorAuditKillPeriod;
+ }
+
+ @Override
+ public Duration getCoordinatorAuditKillDurationToRetain()
+ {
+ return coordinatorAuditKillDurationToRetain;
+ }
+
+ @Override
public int getCoordinatorKillMaxSegments()
{
return coordinatorKillMaxSegments;
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
new file mode 100644
index 0000000..b0f273c
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KillAuditLogTest
+{
+ @Mock
+ private AuditManager mockAuditManager;
+
+ @Mock
+ private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
+
+ @Mock
+ private ServiceEmitter mockServiceEmitter;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ private KillAuditLog killAuditLog;
+
+ @Test
+ public void testRunSkipIfLastRunLessThanPeriod()
+ {
+ TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+ null,
+ null,
+ null,
+ new Duration("PT5S"),
+ null,
+ null,
+ null,
+ new Duration(Long.MAX_VALUE),
+ new Duration("PT1S"),
+ 10,
+ null
+ );
+ killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+ killAuditLog.run(mockDruidCoordinatorRuntimeParams);
+ Mockito.verifyZeroInteractions(mockAuditManager);
+ }
+
+ @Test
+ public void testRunNotSkipIfLastRunMoreThanPeriod()
+ {
+ Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+ TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+ null,
+ null,
+ null,
+ new Duration("PT5S"),
+ null,
+ null,
+ null,
+ new Duration("PT6S"),
+ new Duration("PT1S"),
+ 10,
+ null
+ );
+ killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+ killAuditLog.run(mockDruidCoordinatorRuntimeParams);
+ Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
+ Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ }
+
+ @Test
+ public void testConstructorFailIfInvalidPeriod()
+ {
+ TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+ null,
+ null,
+ null,
+ new Duration("PT5S"),
+ null,
+ null,
+ null,
+ new Duration("PT3S"),
+ new Duration("PT1S"),
+ 10,
+ null
+ );
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
+ killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+ }
+
+ @Test
+ public void testConstructorFailIfInvalidRetainDuration()
+ {
+ TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+ null,
+ null,
+ null,
+ new Duration("PT5S"),
+ null,
+ null,
+ null,
+ new Duration("PT6S"),
+ new Duration("PT-1S"),
+ 10,
+ null
+ );
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("coordinator audit kill retainDuration must be >= 0");
+ killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 38aa78e..cd76666 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -105,9 +105,12 @@
null,
null,
Duration.parse("PT76400S"),
+ null,
new Duration(1),
Duration.parse("PT86400S"),
Duration.parse("PT86400S"),
+ null,
+ null,
1000,
Duration.ZERO
)
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 1876a28..9e3c65b 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -46,6 +46,7 @@
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
+import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -71,6 +72,7 @@
import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.http.ClusterResource;
import org.apache.druid.server.http.CompactionResource;
@@ -214,14 +216,14 @@
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DataSourcesResource.class);
- final ConditionalMultibind<CoordinatorDuty> conditionalMultibind = ConditionalMultibind.create(
+ // Binding for Set of indexing service coordinator Ddty
+ final ConditionalMultibind<CoordinatorDuty> conditionalIndexingServiceDutyMultibind = ConditionalMultibind.create(
properties,
binder,
CoordinatorDuty.class,
CoordinatorIndexingServiceDuty.class
);
-
- if (conditionalMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
+ if (conditionalIndexingServiceDutyMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
throw new UnsupportedOperationException(
"'druid.coordinator.merge.on' is not supported anymore. "
+ "Please consider using Coordinator's automatic compaction instead. "
@@ -230,19 +232,31 @@
+ "for more details about compaction."
);
}
-
- conditionalMultibind.addConditionBinding(
+ conditionalIndexingServiceDutyMultibind.addConditionBinding(
"druid.coordinator.kill.on",
Predicates.equalTo("true"),
KillUnusedSegments.class
);
- conditionalMultibind.addConditionBinding(
+ conditionalIndexingServiceDutyMultibind.addConditionBinding(
"druid.coordinator.kill.pendingSegments.on",
"true",
Predicates.equalTo("true"),
KillStalePendingSegments.class
);
+ // Binding for Set of metadata store management coordinator Ddty
+ final ConditionalMultibind<CoordinatorDuty> conditionalMetadataStoreManagementDutyMultibind = ConditionalMultibind.create(
+ properties,
+ binder,
+ CoordinatorDuty.class,
+ CoordinatorMetadataStoreManagementDuty.class
+ );
+ conditionalMetadataStoreManagementDutyMultibind.addConditionBinding(
+ "druid.coordinator.kill.audit.on",
+ Predicates.equalTo("true"),
+ KillAuditLog.class
+ );
+
bindNodeRoleAndAnnouncer(
binder,
Coordinator.class,
diff --git a/website/.spelling b/website/.spelling
index bd2ecfa..315e5ff 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1683,6 +1683,7 @@
PT24H
PT300S
PT30S
+PT3600S
PT5M
PT5S
PT60S