blob: eb8f9358cef80326e7976064f70d5528065bf67e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final List<InputRow> ROWS = ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("dim1"),
ImmutableMap.of("dim1", "foo", "met1", "1")
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("dim1"),
ImmutableMap.of("dim1", "foo", "met1", 2.0)
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("dim2"),
ImmutableMap.of("dim2", "bar", "met1", 2.0)
)
);
SegmentAllocator allocator;
TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
StreamAppenderatorDriver driver;
DataSegmentKiller dataSegmentKiller;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp()
{
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
}
@After
public void tearDown() throws Exception
{
if (driver != null) {
driver.clear();
driver.close();
}
}
@Test
public void testFailDuringPersist() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage("Fail test while persisting segments"
+ "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, "
+ "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]");
driver = new StreamAppenderatorDriver(
createPersistFailAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
driver.startJob(null);
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
segmentHandoffNotifierFactory.setHandoffDelay(100);
Assert.assertNull(driver.startJob(null));
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
driver.publish(
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
public void testFailDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage("Fail test while pushing segments"
+ "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, "
+ "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]");
driver = new StreamAppenderatorDriver(
createPushFailAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
driver.startJob(null);
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
segmentHandoffNotifierFactory.setHandoffDelay(100);
Assert.assertNull(driver.startJob(null));
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
driver.publish(
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
public void testFailDuringDrop() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage(
"Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"
);
driver = new StreamAppenderatorDriver(
createDropFailAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
driver.startJob(null);
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
segmentHandoffNotifierFactory.setHandoffDelay(100);
Assert.assertNull(driver.startJob(null));
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
final SegmentsAndCommitMetadata published = driver.publish(
StreamAppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
driver.registerHandoff(published).get();
}
@Test
public void testFailDuringPublish() throws Exception
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage("Failed to publish segments because of [test]");
testFailDuringPublishInternal(false);
}
@Test
public void testFailWithExceptionDuringPublish() throws Exception
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(RuntimeException.class));
expectedException.expectMessage("test");
testFailDuringPublishInternal(true);
}
private void testFailDuringPublishInternal(boolean failWithException) throws Exception
{
driver = new StreamAppenderatorDriver(
new FailableAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
driver.startJob(null);
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
segmentHandoffNotifierFactory.setHandoffDelay(100);
Assert.assertNull(driver.startJob(null));
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
if (!failWithException) {
// Should only kill segments if there was _no_ exception.
dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
}
EasyMock.replay(dataSegmentKiller);
try {
driver.publish(
StreamAppenderatorDriverTest.makeFailingPublisher(failWithException),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw e;
}
finally {
EasyMock.verify(dataSegmentKiller);
}
}
private static class NoopUsedSegmentChecker implements UsedSegmentChecker
{
@Override
public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers)
{
return ImmutableSet.of();
}
}
static Appenderator createPushFailAppenderator()
{
return new FailableAppenderator().disablePush();
}
static Appenderator createPushInterruptAppenderator()
{
return new FailableAppenderator().interruptPush();
}
static Appenderator createPersistFailAppenderator()
{
return new FailableAppenderator().disablePersist();
}
static Appenderator createDropFailAppenderator()
{
return new FailableAppenderator().disableDrop();
}
private static class FailableAppenderator implements Appenderator
{
private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new TreeMap<>();
private boolean dropEnabled = true;
private boolean persistEnabled = true;
private boolean pushEnabled = true;
private boolean interruptPush = false;
private int numRows;
public FailableAppenderator disableDrop()
{
dropEnabled = false;
return this;
}
public FailableAppenderator disablePersist()
{
persistEnabled = false;
return this;
}
public FailableAppenderator disablePush()
{
pushEnabled = false;
interruptPush = false;
return this;
}
public FailableAppenderator interruptPush()
{
pushEnabled = false;
interruptPush = true;
return this;
}
@Override
public String getId()
{
return null;
}
@Override
public String getDataSource()
{
return null;
}
@Override
public Object startJob()
{
return null;
}
@Override
public AppenderatorAddResult add(
SegmentIdWithShardSpec identifier,
InputRow row,
Supplier<Committer> committerSupplier,
boolean allowIncrementalPersists
)
{
rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row);
numRows++;
return new AppenderatorAddResult(identifier, numRows, false);
}
@Override
public List<SegmentIdWithShardSpec> getSegments()
{
return ImmutableList.copyOf(rows.keySet());
}
@Override
public int getRowCount(SegmentIdWithShardSpec identifier)
{
final List<InputRow> rows = this.rows.get(identifier);
if (rows != null) {
return rows.size();
} else {
return 0;
}
}
@Override
public int getTotalRowCount()
{
return numRows;
}
@Override
public void clear()
{
rows.clear();
}
@Override
public ListenableFuture<?> drop(SegmentIdWithShardSpec identifier)
{
if (dropEnabled) {
rows.remove(identifier);
return Futures.immediateFuture(null);
} else {
return Futures.immediateFailedFuture(new ISE("Fail test while dropping segment[%s]", identifier));
}
}
@Override
public ListenableFuture<Object> persistAll(Committer committer)
{
if (persistEnabled) {
// do nothing
return Futures.immediateFuture(committer.getMetadata());
} else {
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", rows.keySet()));
}
}
@Override
public ListenableFuture<SegmentsAndCommitMetadata> push(
Collection<SegmentIdWithShardSpec> identifiers,
Committer committer,
boolean useUniquePath
)
{
if (pushEnabled) {
final List<DataSegment> segments = identifiers.stream()
.map(
id -> new DataSegment(
id.getDataSource(),
id.getInterval(),
id.getVersion(),
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
id.getShardSpec(),
0,
0
)
)
.collect(Collectors.toList());
return Futures.transform(
persistAll(committer),
(Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> new SegmentsAndCommitMetadata(segments, commitMetadata, null),
MoreExecutors.directExecutor()
);
} else {
if (interruptPush) {
return new AbstractFuture<SegmentsAndCommitMetadata>()
{
@Override
public SegmentsAndCommitMetadata get(long timeout, TimeUnit unit)
throws InterruptedException
{
throw new InterruptedException("Interrupt test while pushing segments");
}
@Override
public SegmentsAndCommitMetadata get() throws InterruptedException
{
throw new InterruptedException("Interrupt test while pushing segments");
}
};
} else {
return Futures.immediateFailedFuture(new ISE("Fail test while pushing segments[%s]", identifiers));
}
}
}
@Override
public void close()
{
}
@Override
public void closeNow()
{
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
throw new UnsupportedOperationException();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
throw new UnsupportedOperationException();
}
}
}