blob: a7701c4e104261aad29d85f10f046eda073355bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.FingerprintGenerator;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.config.MetadataCleanupConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@RunWith(MockitoJUnitRunner.class)
public class KillUnreferencedSegmentSchemaDutyTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule =
new TestDerbyConnector.DerbyConnectorRule(CentralizedDatasourceSchemaConfig.create(true));
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
private TestDerbyConnector derbyConnector;
private MetadataStorageTablesConfig tablesConfig;
private SegmentSchemaManager segmentSchemaManager;
private FingerprintGenerator fingerprintGenerator;
private SegmentSchemaTestUtils segmentSchemaTestUtils;
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
@Before
public void setUp()
{
derbyConnector = derbyConnectorRule.getConnector();
tablesConfig = derbyConnectorRule.metadataTablesConfigSupplier().get();
derbyConnector.createSegmentSchemasTable();
derbyConnector.createSegmentTable();
fingerprintGenerator = new FingerprintGenerator(mapper);
segmentSchemaManager = new SegmentSchemaManager(derbyConnectorRule.metadataTablesConfigSupplier().get(), mapper, derbyConnector);
segmentSchemaTestUtils = new SegmentSchemaTestUtils(derbyConnectorRule, derbyConnector, mapper);
CoordinatorRunStats runStats = new CoordinatorRunStats();
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
}
@Test
public void testKillUnreferencedSchema()
{
List<DateTime> dateTimes = new ArrayList<>();
DateTime now = DateTimes.nowUtc();
dateTimes.add(now);
dateTimes.add(now.plusMinutes(61));
dateTimes.add(now.plusMinutes(6 * 60 + 1));
final MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
true,
Period.parse("PT1H").toStandardDuration(),
Period.parse("PT6H").toStandardDuration()
);
KillUnreferencedSegmentSchemaDuty duty =
new TestKillUnreferencedSegmentSchemasDuty(cleanupConfig, segmentSchemaManager, dateTimes);
Set<DataSegment> segments = new HashSet<>();
List<SegmentSchemaManager.SegmentSchemaMetadataPlus> schemaMetadataPluses = new ArrayList<>();
RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build();
SchemaPayload schemaPayload = new SchemaPayload(rowSignature);
SchemaPayloadPlus schemaMetadata = new SchemaPayloadPlus(schemaPayload, (long) 1);
DataSegment segment1 = new DataSegment(
"foo",
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-01",
ImmutableMap.of("path", "a-1"),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
DataSegment segment2 = new DataSegment(
"foo",
Intervals.of("2023-01-02/2023-01-03"),
"2023-01-02",
ImmutableMap.of("path", "a-1"),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
segments.add(segment1);
segments.add(segment2);
String fingerprint =
fingerprintGenerator.generateFingerprint(
schemaPayload,
segment1.getDataSource(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);
SegmentSchemaManager.SegmentSchemaMetadataPlus plus1 =
new SegmentSchemaManager.SegmentSchemaMetadataPlus(
segment1.getId(),
fingerprint,
schemaMetadata
);
schemaMetadataPluses.add(plus1);
SegmentSchemaManager.SegmentSchemaMetadataPlus plus2 =
new SegmentSchemaManager.SegmentSchemaMetadataPlus(
segment2.getId(),
fingerprint,
schemaMetadata
);
schemaMetadataPluses.add(plus2);
segmentSchemaTestUtils.insertUsedSegments(segments, Collections.emptyMap());
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
"foo",
schemaMetadataPluses,
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);
// delete segment1
deleteSegment(schemaMetadataPluses.get(0).getSegmentId());
// this call should do nothing
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertEquals(Boolean.TRUE, getSchemaUsedStatus(fingerprint));
// delete segment2
deleteSegment(schemaMetadataPluses.get(1).getSegmentId());
// this call should mark the schema unused
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertEquals(Boolean.FALSE, getSchemaUsedStatus(fingerprint));
// this call should delete the schema
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertNull(getSchemaUsedStatus(fingerprint));
}
@Test
public void testKillUnreferencedSchema_repair()
{
List<DateTime> dateTimes = new ArrayList<>();
DateTime now = DateTimes.nowUtc();
dateTimes.add(now);
dateTimes.add(now.plusMinutes(61));
MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
true,
Period.parse("PT1H").toStandardDuration(),
Period.parse("PT6H").toStandardDuration()
);
KillUnreferencedSegmentSchemaDuty duty =
new TestKillUnreferencedSegmentSchemasDuty(cleanupConfig, segmentSchemaManager, dateTimes);
RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build();
SchemaPayload schemaPayload = new SchemaPayload(rowSignature);
String fingerprint = fingerprintGenerator.generateFingerprint(schemaPayload, "foo", CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
inHandle(
handle -> {
segmentSchemaManager.persistSegmentSchema(
handle,
"foo",
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
Collections.singletonMap(fingerprint, schemaPayload)
);
return null;
}
);
Assert.assertEquals(Boolean.TRUE, getSchemaUsedStatus(fingerprint));
// this call should mark the schema as unused
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertEquals(Boolean.FALSE, getSchemaUsedStatus(fingerprint));
// associate a segment to the schema
DataSegment segment1 = new DataSegment(
"foo",
Intervals.of("2023-01-02/2023-01-03"),
"2023-01-02",
ImmutableMap.of("path", "a-1"),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
segmentSchemaTestUtils.insertUsedSegments(Collections.singleton(segment1), Collections.emptyMap());
inHandle(
handle -> handle.createStatement(
StringUtils.format(
"UPDATE %s SET schema_fingerprint = '%s', num_rows = 100 WHERE id = '%s'",
tablesConfig.getSegmentsTable(), fingerprint, segment1.getId().toString()
)).execute()
);
// this call should make the schema used
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertEquals(Boolean.TRUE, getSchemaUsedStatus(fingerprint));
}
@Test
public void testKillOlderVersionSchema()
{
List<DateTime> dateTimes = new ArrayList<>();
DateTime now = DateTimes.nowUtc();
dateTimes.add(now);
dateTimes.add(now.plusMinutes(61));
dateTimes.add(now.plusMinutes(6 * 60 + 1));
MetadataCleanupConfig cleanupConfig = new MetadataCleanupConfig(
true,
Period.parse("PT1H").toStandardDuration(),
Period.parse("PT6H").toStandardDuration()
);
KillUnreferencedSegmentSchemaDuty duty =
new TestKillUnreferencedSegmentSchemasDuty(cleanupConfig, segmentSchemaManager, dateTimes);
// create 2 versions of same schema
// unreferenced one should get deleted
RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build();
SchemaPayload schemaPayload = new SchemaPayload(rowSignature);
String fingerprintOldVersion = fingerprintGenerator.generateFingerprint(schemaPayload, "foo", 0);
String fingerprintNewVersion = fingerprintGenerator.generateFingerprint(schemaPayload, "foo", 1);
inHandle(
handle -> {
segmentSchemaManager.persistSegmentSchema(
handle,
"foo",
0,
Collections.singletonMap(fingerprintOldVersion, schemaPayload)
);
return null;
}
);
inHandle(
handle -> {
segmentSchemaManager.persistSegmentSchema(
handle,
"foo",
1,
Collections.singletonMap(fingerprintNewVersion, schemaPayload)
);
return null;
}
);
// this call should mark both the schema as unused
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertEquals(Boolean.FALSE, getSchemaUsedStatus(fingerprintOldVersion));
Assert.assertEquals(Boolean.FALSE, getSchemaUsedStatus(fingerprintNewVersion));
// associate a segment to the schema
DataSegment segment1 = new DataSegment(
"foo",
Intervals.of("2023-01-02/2023-01-03"),
"2023-01-02",
ImmutableMap.of("path", "a-1"),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
segmentSchemaTestUtils.insertUsedSegments(Collections.singleton(segment1), Collections.emptyMap());
inHandle(
handle -> handle.createStatement(
StringUtils.format(
"UPDATE %s SET schema_fingerprint = '%s', num_rows = 100 WHERE id = '%s'",
tablesConfig.getSegmentsTable(), fingerprintNewVersion, segment1.getId().toString()
)).execute()
);
// this call should make the referenced schema used
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertEquals(Boolean.TRUE, getSchemaUsedStatus(fingerprintNewVersion));
// this call should kill the schema
duty.run(mockDruidCoordinatorRuntimeParams);
Assert.assertNull(getSchemaUsedStatus(fingerprintOldVersion));
}
private static class TestKillUnreferencedSegmentSchemasDuty extends KillUnreferencedSegmentSchemaDuty
{
private final List<DateTime> dateTimes;
private int index = -1;
public TestKillUnreferencedSegmentSchemasDuty(
MetadataCleanupConfig config,
SegmentSchemaManager segmentSchemaManager,
List<DateTime> dateTimes
)
{
super(config, segmentSchemaManager);
this.dateTimes = dateTimes;
}
@Override
protected DateTime getCurrentTime()
{
index++;
return dateTimes.get(index);
}
}
private <T> T inHandle(HandleCallback<T> callback)
{
return derbyConnector.retryWithHandle(callback);
}
private void deleteSegment(SegmentId id)
{
inHandle(handle -> {
Update deleteStatement = handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE id = '%s'",
tablesConfig.getSegmentsTable(),
id.toString()
)
);
deleteStatement.execute();
return null;
});
}
private Boolean getSchemaUsedStatus(String fingerprint)
{
List<Boolean> usedStatus = inHandle(
handle -> handle.createQuery(StringUtils.format(
"SELECT used from %s where fingerprint = '%s'",
tablesConfig.getSegmentSchemasTable(), fingerprint
)).mapTo(Boolean.class).list()
);
return usedStatus.isEmpty() ? null : usedStatus.get(0);
}
}