blob: 9b73cbe12cd37a4b1ec4dabf81f97338f69ed982 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
*/
@RunWith(Parameterized.class)
public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
{
@Parameterized.Parameters(name = "rejectionPolicy = {0}, segmentWriteOutMediumFactory = {1}")
public static Collection<?> constructorFeeder()
{
final RejectionPolicyFactory[] rejectionPolicies = new RejectionPolicyFactory[]{
new NoopRejectionPolicyFactory(),
new MessageTimeRejectionPolicyFactory()
};
final List<Object[]> constructors = new ArrayList<>();
for (RejectionPolicyFactory rejectionPolicy : rejectionPolicies) {
constructors.add(new Object[]{rejectionPolicy, OffHeapMemorySegmentWriteOutMediumFactory.instance()});
constructors.add(new Object[]{rejectionPolicy, TmpFileSegmentWriteOutMediumFactory.instance()});
}
return constructors;
}
private final RejectionPolicyFactory rejectionPolicy;
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
private RealtimePlumber plumber;
private RealtimePlumberSchool realtimePlumberSchool;
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
private SegmentHandoffNotifier handoffNotifier;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig;
private DataSchema schema;
private DataSchema schema2;
private FireDepartmentMetrics metrics;
private File tmpDir;
public RealtimePlumberSchoolTest(
RejectionPolicyFactory rejectionPolicy,
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
)
{
this.rejectionPolicy = rejectionPolicy;
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
}
@Before
public void setUp() throws Exception
{
tmpDir = FileUtils.createTempDir();
ObjectMapper jsonMapper = new DefaultObjectMapper();
schema = new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
),
null
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null),
null,
jsonMapper
);
schema2 = new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
),
null
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.YEAR, Granularities.NONE, null),
null,
jsonMapper
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
announcer.announceSegment(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class);
handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class);
EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString()))
.andReturn(handoffNotifier)
.anyTimes();
EasyMock.expect(
handoffNotifier.registerSegmentHandoffCallback(
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject()
)
).andReturn(true).anyTimes();
emitter = EasyMock.createMock(ServiceEmitter.class);
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
tuningConfig = new RealtimeTuningConfig(
null,
1,
null,
null,
null,
null,
new IntervalStartVersioningPolicy(),
rejectionPolicy,
null,
null,
null,
null,
true,
0,
0,
false,
null,
null,
null,
null
);
realtimePlumberSchool = new RealtimePlumberSchool(
emitter,
new DefaultQueryRunnerFactoryConglomerate(new HashMap<>()),
dataSegmentPusher,
announcer,
segmentPublisher,
handoffNotifierFactory,
Execs.directExecutor(),
NoopJoinableFactory.INSTANCE,
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory),
TestHelper.getTestIndexIO(),
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
new CachePopulatorStats(),
TestHelper.makeJsonMapper()
);
metrics = new FireDepartmentMetrics();
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
FileUtils.deleteDirectory(
new File(
tuningConfig.getBasePersistDirectory(),
schema.getDataSource()
)
);
FileUtils.deleteDirectory(tmpDir);
}
@Test(timeout = 60_000L)
public void testPersist() throws Exception
{
testPersist(null);
}
@Test(timeout = 60_000L)
public void testPersistWithCommitMetadata() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testPersist(commitMetadata);
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics);
Assert.assertEquals(commitMetadata, plumber.startJob());
}
private void testPersist(final Object commitMetadata) throws Exception
{
Sink sink = new Sink(
Intervals.utc(0, TimeUnit.HOURS.toMillis(1)),
schema,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
Assert.assertNull(plumber.startJob());
final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row);
final CountDownLatch doneSignal = new CountDownLatch(1);
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
doneSignal.countDown();
}
};
plumber.add(row, Suppliers.ofInstance(committer));
plumber.persist(committer);
doneSignal.await();
plumber.getSinks().clear();
plumber.finishJob();
}
@Test(timeout = 60_000L)
public void testPersistFails() throws Exception
{
Sink sink = new Sink(
Intervals.utc(0, TimeUnit.HOURS.toMillis(1)),
schema,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
plumber.startJob();
final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row);
plumber.add(row, Suppliers.ofInstance(Committers.nil()));
final CountDownLatch doneSignal = new CountDownLatch(1);
plumber.persist(
supplierFromRunnable(
() -> {
doneSignal.countDown();
throw new RuntimeException();
}
).get()
);
doneSignal.await();
// Exception may need time to propagate
while (metrics.failedPersists() < 1) {
Thread.sleep(100);
}
Assert.assertEquals(1, metrics.failedPersists());
}
@Test(timeout = 60_000L)
public void testPersistHydrantGaps() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testPersistHydrantGapsHelper(commitMetadata);
}
private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception
{
Interval testInterval = new Interval(DateTimes.of("1970-01-01"), DateTimes.of("1971-01-01"));
RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
Sink sink = new Sink(
testInterval,
schema2,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber2.getSinks().put(0L, sink);
Assert.assertNull(plumber2.startJob());
final CountDownLatch doneSignal = new CountDownLatch(1);
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
doneSignal.countDown();
}
};
plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer));
plumber2.persist(committer);
doneSignal.await();
plumber2.getSinks().clear();
plumber2.finishJob();
File persistDir = plumber2.computePersistDir(schema2, testInterval);
/* Check that all hydrants were persisted */
for (int i = 0; i < 5; i++) {
Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists());
}
/* Create some gaps in the persisted hydrants and reload */
FileUtils.deleteDirectory(new File(persistDir, "1"));
FileUtils.deleteDirectory(new File(persistDir, "3"));
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(
schema2,
tuningConfig,
metrics
);
restoredPlumber.bootstrapSinksFromDisk();
Map<Long, Sink> sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(new Long(0)));
DateTime startTime = DateTimes.of("1970-01-01T00:00:00.000Z");
Interval expectedInterval = new Interval(startTime, DateTimes.of("1971-01-01T00:00:00.000Z"));
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(
expectedInterval,
hydrants.get(0).getSegmentDataInterval()
);
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(
expectedInterval,
hydrants.get(1).getSegmentDataInterval()
);
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(
expectedInterval,
hydrants.get(2).getSegmentDataInterval()
);
/* Delete all the hydrants and reload, no sink should be created */
FileUtils.deleteDirectory(new File(persistDir, "0"));
FileUtils.deleteDirectory(new File(persistDir, "2"));
FileUtils.deleteDirectory(new File(persistDir, "4"));
RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(
schema2,
tuningConfig,
metrics
);
restoredPlumber2.bootstrapSinksFromDisk();
Assert.assertEquals(0, restoredPlumber2.getSinks().size());
}
@Test(timeout = 60_000L)
public void testDimOrderInheritance() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testDimOrderInheritanceHelper(commitMetadata);
}
private void testDimOrderInheritanceHelper(final Object commitMetadata) throws Exception
{
List<List<String>> expectedDims = ImmutableList.of(
ImmutableList.of("dimD"),
ImmutableList.of("dimC"),
ImmutableList.of("dimA"),
ImmutableList.of("dimB"),
ImmutableList.of("dimE"),
ImmutableList.of("dimD", "dimC", "dimA", "dimB", "dimE")
);
QueryableIndex qindex;
FireHydrant hydrant;
Map<Long, Sink> sinks;
RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
Assert.assertNull(plumber.startJob());
final CountDownLatch doneSignal = new CountDownLatch(1);
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
doneSignal.countDown();
}
};
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimD"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimC"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimA"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimB"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimE"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimA", "dimB", "dimC", "dimD", "dimE"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.persist(committer);
doneSignal.await();
plumber.getSinks().clear();
plumber.finishJob();
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(
schema2,
tuningConfig,
metrics
);
restoredPlumber.bootstrapSinksFromDisk();
sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(0L));
for (int i = 0; i < hydrants.size(); i++) {
hydrant = hydrants.get(i);
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
try {
qindex = segment.asQueryableIndex();
Assert.assertEquals(i, hydrant.getCount());
Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions()));
}
finally {
segment.decrement();
}
}
}
private InputRow getTestInputRow(final String timeStr)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return new ArrayList<>();
}
@Override
public long getTimestampFromEpoch()
{
return DateTimes.of(timeStr).getMillis();
}
@Override
public DateTime getTimestamp()
{
return DateTimes.of(timeStr);
}
@Override
public List<String> getDimension(String dimension)
{
return new ArrayList<>();
}
@Override
public Number getMetric(String metric)
{
return 0;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}
private InputRow getTestInputRowFull(final String timeStr, final List<String> dims, final List<String> dimVals)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return dims;
}
@Override
public long getTimestampFromEpoch()
{
return DateTimes.of(timeStr).getMillis();
}
@Override
public DateTime getTimestamp()
{
return DateTimes.of(timeStr);
}
@Override
public List<String> getDimension(String dimension)
{
return dimVals;
}
@Override
public Number getMetric(String metric)
{
return 0;
}
@Override
public Object getRaw(String dimension)
{
return dimVals;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}
private static Supplier<Committer> supplierFromRunnable(final Runnable runnable)
{
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
runnable.run();
}
};
return Suppliers.ofInstance(committer);
}
}