| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.common.task; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.client.cache.CachePopulatorStats; |
| import org.apache.druid.client.cache.MapCache; |
| import org.apache.druid.client.indexing.NoopIndexingServiceClient; |
| import org.apache.druid.common.config.NullHandling; |
| import org.apache.druid.data.input.FirehoseFactory; |
| import org.apache.druid.data.input.impl.DimensionsSpec; |
| import org.apache.druid.data.input.impl.MapInputRowParser; |
| import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; |
| import org.apache.druid.data.input.impl.TimestampSpec; |
| import org.apache.druid.discovery.DataNodeService; |
| import org.apache.druid.discovery.DruidNodeAnnouncer; |
| import org.apache.druid.discovery.LookupNodeService; |
| import org.apache.druid.indexer.TaskState; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.SegmentLoaderFactory; |
| import org.apache.druid.indexing.common.TaskToolbox; |
| import org.apache.druid.indexing.common.TaskToolboxFactory; |
| import org.apache.druid.indexing.common.TestFirehose; |
| import org.apache.druid.indexing.common.TestUtils; |
| import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; |
| import org.apache.druid.indexing.common.actions.TaskActionClientFactory; |
| import org.apache.druid.indexing.common.actions.TaskActionToolbox; |
| import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; |
| import org.apache.druid.indexing.common.config.TaskConfig; |
| import org.apache.druid.indexing.common.config.TaskStorageConfig; |
| import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; |
| import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; |
| import org.apache.druid.indexing.overlord.TaskLockbox; |
| import org.apache.druid.indexing.overlord.TaskStorage; |
| import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; |
| import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; |
| import org.apache.druid.indexing.test.TestDataSegmentKiller; |
| import org.apache.druid.indexing.test.TestDataSegmentPusher; |
| import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.jackson.JacksonUtils; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.java.util.common.parsers.ParseException; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.core.NoopEmitter; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.java.util.metrics.MonitorScheduler; |
| import org.apache.druid.math.expr.ExprMacroTable; |
| import org.apache.druid.metadata.EntryExistsException; |
| import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; |
| import org.apache.druid.query.Druids; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryPlus; |
| import org.apache.druid.query.QueryRunnerFactoryConglomerate; |
| import org.apache.druid.query.QueryWatcher; |
| import org.apache.druid.query.Result; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.CountAggregatorFactory; |
| import org.apache.druid.query.aggregation.LongSumAggregatorFactory; |
| import org.apache.druid.query.filter.DimFilter; |
| import org.apache.druid.query.filter.SelectorDimFilter; |
| import org.apache.druid.query.timeseries.TimeseriesQuery; |
| import org.apache.druid.query.timeseries.TimeseriesQueryEngine; |
| import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; |
| import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; |
| import org.apache.druid.query.timeseries.TimeseriesResultValue; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.indexing.RealtimeIOConfig; |
| import org.apache.druid.segment.indexing.RealtimeTuningConfig; |
| import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; |
| import org.apache.druid.segment.join.NoopJoinableFactory; |
| import org.apache.druid.segment.realtime.FireDepartment; |
| import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; |
| import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; |
| import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; |
| import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; |
| import org.apache.druid.segment.transform.ExpressionTransform; |
| import org.apache.druid.segment.transform.TransformSpec; |
| import org.apache.druid.server.DruidNode; |
| import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; |
| import org.apache.druid.server.coordination.ServerType; |
| import org.apache.druid.server.security.AuthTestUtils; |
| import org.apache.druid.testing.InitializedNullHandlingTest; |
| import org.apache.druid.timeline.DataSegment; |
| import org.easymock.EasyMock; |
| import org.hamcrest.CoreMatchers; |
| import org.joda.time.DateTime; |
| import org.joda.time.Period; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.internal.matchers.ThrowableMessageMatcher; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.TemporaryFolder; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.nio.file.Files; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| |
| public class RealtimeIndexTaskTest extends InitializedNullHandlingTest |
| { |
| private static final Logger log = new Logger(RealtimeIndexTaskTest.class); |
| private static final ServiceEmitter EMITTER = new ServiceEmitter( |
| "service", |
| "host", |
| new NoopEmitter() |
| ); |
| |
| @Rule |
| public final ExpectedException expectedException = ExpectedException.none(); |
| |
| @Rule |
| public final TemporaryFolder tempFolder = new TemporaryFolder(); |
| |
| private DateTime now; |
| private ListeningExecutorService taskExec; |
| private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks; |
| |
| @Before |
| public void setUp() |
| { |
| EmittingLogger.registerEmitter(EMITTER); |
| EMITTER.start(); |
| taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d")); |
| now = DateTimes.nowUtc(); |
| } |
| |
| @After |
| public void tearDown() |
| { |
| taskExec.shutdownNow(); |
| } |
| |
| @Test |
| public void testMakeTaskId() |
| { |
| Assert.assertEquals( |
| "index_realtime_test_0_2015-01-02T00:00:00.000Z_abcdefgh", |
| RealtimeIndexTask.makeTaskId("test", 0, DateTimes.of("2015-01-02"), "abcdefgh") |
| ); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testDefaultResource() |
| { |
| final RealtimeIndexTask task = makeRealtimeTask(null); |
| Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testSupportsQueries() |
| { |
| final RealtimeIndexTask task = makeRealtimeTask(null); |
| Assert.assertTrue(task.supportsQueries()); |
| } |
| |
| @Test(timeout = 60_000L, expected = ExecutionException.class) |
| public void testHandoffTimeout() throws Exception |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final RealtimeIndexTask task = makeRealtimeTask(null, TransformSpec.NONE, true, 100L); |
| final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1") |
| ) |
| ); |
| |
| // Stop the firehose, this will drain out existing events. |
| firehose.close(); |
| |
| // Wait for publish. |
| while (mdc.getPublished().isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| Assert.assertEquals(1, task.getMetrics().processed()); |
| Assert.assertNotNull(Iterables.getOnlyElement(mdc.getPublished())); |
| |
| |
| // handoff would timeout, resulting in exception |
| statusFuture.get(); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testBasics() throws Exception |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final RealtimeIndexTask task = makeRealtimeTask(null); |
| final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox); |
| final DataSegment publishedSegment; |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), |
| ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0), |
| ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) |
| ) |
| ); |
| |
| // Stop the firehose, this will drain out existing events. |
| firehose.close(); |
| |
| // Wait for publish. |
| while (mdc.getPublished().isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); |
| |
| // Check metrics. |
| Assert.assertEquals(2, task.getMetrics().processed()); |
| Assert.assertEquals(1, task.getMetrics().thrownAway()); |
| Assert.assertEquals(0, task.getMetrics().unparseable()); |
| |
| // Do some queries. |
| Assert.assertEquals(2, sumMetric(task, null, "rows").longValue()); |
| Assert.assertEquals(3, sumMetric(task, null, "met1").longValue()); |
| |
| // Simulate handoff. |
| for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) { |
| final Pair<Executor, Runnable> executorRunnablePair = entry.getValue(); |
| Assert.assertEquals( |
| new SegmentDescriptor( |
| publishedSegment.getInterval(), |
| publishedSegment.getVersion(), |
| publishedSegment.getShardSpec().getPartitionNum() |
| ), |
| entry.getKey() |
| ); |
| executorRunnablePair.lhs.execute(executorRunnablePair.rhs); |
| } |
| handOffCallbacks.clear(); |
| |
| // Wait for the task to finish. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testTransformSpec() throws Exception |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final TransformSpec transformSpec = new TransformSpec( |
| new SelectorDimFilter("dim1", "foo", null), |
| ImmutableList.of( |
| new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()) |
| ) |
| ); |
| final RealtimeIndexTask task = makeRealtimeTask(null, transformSpec, true, 0); |
| final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox); |
| final DataSegment publishedSegment; |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), |
| ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0), |
| ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) |
| ) |
| ); |
| |
| // Stop the firehose, this will drain out existing events. |
| firehose.close(); |
| |
| // Wait for publish. |
| while (mdc.getPublished().isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); |
| |
| // Check metrics. |
| Assert.assertEquals(1, task.getMetrics().processed()); |
| Assert.assertEquals(2, task.getMetrics().thrownAway()); |
| Assert.assertEquals(0, task.getMetrics().unparseable()); |
| |
| // Do some queries. |
| Assert.assertEquals(1, sumMetric(task, null, "rows").longValue()); |
| Assert.assertEquals(1, sumMetric(task, new SelectorDimFilter("dim1t", "foofoo", null), "rows").longValue()); |
| if (NullHandling.replaceWithDefault()) { |
| Assert.assertEquals(0, sumMetric(task, new SelectorDimFilter("dim1t", "barbar", null), "rows").longValue()); |
| } else { |
| Assert.assertNull(sumMetric(task, new SelectorDimFilter("dim1t", "barbar", null), "rows")); |
| |
| } |
| Assert.assertEquals(1, sumMetric(task, null, "met1").longValue()); |
| |
| // Simulate handoff. |
| for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) { |
| final Pair<Executor, Runnable> executorRunnablePair = entry.getValue(); |
| Assert.assertEquals( |
| new SegmentDescriptor( |
| publishedSegment.getInterval(), |
| publishedSegment.getVersion(), |
| publishedSegment.getShardSpec().getPartitionNum() |
| ), |
| entry.getKey() |
| ); |
| executorRunnablePair.lhs.execute(executorRunnablePair.rhs); |
| } |
| handOffCallbacks.clear(); |
| |
| // Wait for the task to finish. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testReportParseExceptionsOnBadMetric() throws Exception |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final RealtimeIndexTask task = makeRealtimeTask(null, true); |
| final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"), |
| ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", "foo"), |
| ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) |
| ) |
| ); |
| |
| // Stop the firehose, this will drain out existing events. |
| firehose.close(); |
| |
| // Wait for the task to finish. |
| expectedException.expect(ExecutionException.class); |
| expectedException.expectCause(CoreMatchers.instanceOf(ParseException.class)); |
| expectedException.expectCause( |
| ThrowableMessageMatcher.hasMessage( |
| CoreMatchers.containsString("[Unable to parse value[foo] for field[met1]") |
| ) |
| ); |
| |
| statusFuture.get(); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testNoReportParseExceptions() throws Exception |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final RealtimeIndexTask task = makeRealtimeTask(null, false); |
| final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox); |
| final DataSegment publishedSegment; |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task.getFirehose(); |
| |
| firehose.addRows( |
| Arrays.asList( |
| // Good row- will be processed. |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), |
| |
| // Null row- will be thrown away. |
| null, |
| |
| // Bad metric- will count as processed, but that particular metric won't update. |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"), |
| |
| // Bad row- will be unparseable. |
| ImmutableMap.of("dim1", "foo", "met1", 2.0, TestFirehose.FAIL_DIM, "x"), |
| |
| // Old row- will be thrownAway. |
| ImmutableMap.of("t", now.minus(Period.days(1)).getMillis(), "dim1", "foo", "met1", 2.0), |
| |
| // Good row- will be processed. |
| ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) |
| ) |
| ); |
| |
| // Stop the firehose, this will drain out existing events. |
| firehose.close(); |
| |
| // Wait for publish. |
| while (mdc.getPublished().isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); |
| |
| // Check metrics. |
| Assert.assertEquals(3, task.getMetrics().processed()); |
| Assert.assertEquals(1, task.getMetrics().thrownAway()); |
| Assert.assertEquals(2, task.getMetrics().unparseable()); |
| |
| // Do some queries. |
| Assert.assertEquals(3, sumMetric(task, null, "rows").longValue()); |
| Assert.assertEquals(3, sumMetric(task, null, "met1").longValue()); |
| |
| // Simulate handoff. |
| for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) { |
| final Pair<Executor, Runnable> executorRunnablePair = entry.getValue(); |
| Assert.assertEquals( |
| new SegmentDescriptor( |
| publishedSegment.getInterval(), |
| publishedSegment.getVersion(), |
| publishedSegment.getShardSpec().getPartitionNum() |
| ), |
| entry.getKey() |
| ); |
| executorRunnablePair.lhs.execute(executorRunnablePair.rhs); |
| } |
| handOffCallbacks.clear(); |
| |
| // Wait for the task to finish. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testRestore() throws Exception |
| { |
| final File directory = tempFolder.newFolder(); |
| final RealtimeIndexTask task1 = makeRealtimeTask(null); |
| final DataSegment publishedSegment; |
| |
| // First run: |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task1.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task1.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo") |
| ) |
| ); |
| |
| // Trigger graceful shutdown. |
| task1.stopGracefully(taskToolbox.getConfig()); |
| |
| // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| |
| // Nothing should be published. |
| Assert.assertEquals(new HashSet<>(), mdc.getPublished()); |
| } |
| |
| // Second run: |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); |
| final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task2.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| // Do a query, at this point the previous data should be loaded. |
| Assert.assertEquals(1, sumMetric(task2, null, "rows").longValue()); |
| |
| final TestFirehose firehose = (TestFirehose) task2.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim2", "bar") |
| ) |
| ); |
| |
| // Stop the firehose, this will drain out existing events. |
| firehose.close(); |
| |
| // Wait for publish. |
| while (mdc.getPublished().isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); |
| |
| // Do a query. |
| Assert.assertEquals(2, sumMetric(task2, null, "rows").longValue()); |
| |
| // Simulate handoff. |
| for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) { |
| final Pair<Executor, Runnable> executorRunnablePair = entry.getValue(); |
| Assert.assertEquals( |
| new SegmentDescriptor( |
| publishedSegment.getInterval(), |
| publishedSegment.getVersion(), |
| publishedSegment.getShardSpec().getPartitionNum() |
| ), |
| entry.getKey() |
| ); |
| executorRunnablePair.lhs.execute(executorRunnablePair.rhs); |
| } |
| handOffCallbacks.clear(); |
| |
| // Wait for the task to finish. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| } |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception |
| { |
| final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final File directory = tempFolder.newFolder(); |
| final RealtimeIndexTask task1 = makeRealtimeTask(null); |
| final DataSegment publishedSegment; |
| |
| // First run: |
| { |
| final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task1.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task1.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo") |
| ) |
| ); |
| |
| // Stop the firehose, this will trigger a finishJob. |
| firehose.close(); |
| |
| // Wait for publish. |
| while (mdc.getPublished().isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); |
| |
| // Do a query. |
| Assert.assertEquals(1, sumMetric(task1, null, "rows").longValue()); |
| |
| // Trigger graceful shutdown. |
| task1.stopGracefully(taskToolbox.getConfig()); |
| |
| // Wait for the task to finish. The status doesn't really matter. |
| while (!statusFuture.isDone()) { |
| Thread.sleep(50); |
| } |
| } |
| |
| // Second run: |
| { |
| final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); |
| final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task2.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| // Stop the firehose again, this will start another handoff. |
| final TestFirehose firehose = (TestFirehose) task2.getFirehose(); |
| |
| // Stop the firehose, this will trigger a finishJob. |
| firehose.close(); |
| |
| // publishedSegment is still published. No reason it shouldn't be. |
| Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished()); |
| |
| // Wait for a handoffCallback to show up. |
| while (handOffCallbacks.isEmpty()) { |
| Thread.sleep(50); |
| } |
| |
| // Simulate handoff. |
| for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) { |
| final Pair<Executor, Runnable> executorRunnablePair = entry.getValue(); |
| Assert.assertEquals( |
| new SegmentDescriptor( |
| publishedSegment.getInterval(), |
| publishedSegment.getVersion(), |
| publishedSegment.getShardSpec().getPartitionNum() |
| ), |
| entry.getKey() |
| ); |
| executorRunnablePair.lhs.execute(executorRunnablePair.rhs); |
| } |
| handOffCallbacks.clear(); |
| |
| // Wait for the task to finish. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| } |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testRestoreCorruptData() throws Exception |
| { |
| final File directory = tempFolder.newFolder(); |
| final RealtimeIndexTask task1 = makeRealtimeTask(null); |
| |
| // First run: |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox); |
| |
| // Wait for firehose to show up, it starts off null. |
| while (task1.getFirehose() == null) { |
| Thread.sleep(50); |
| } |
| |
| final TestFirehose firehose = (TestFirehose) task1.getFirehose(); |
| |
| firehose.addRows( |
| ImmutableList.of( |
| ImmutableMap.of("t", now.getMillis(), "dim1", "foo") |
| ) |
| ); |
| |
| // Trigger graceful shutdown. |
| task1.stopGracefully(taskToolbox.getConfig()); |
| |
| // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| |
| // Nothing should be published. |
| Assert.assertEquals(new HashSet<>(), mdc.getPublished()); |
| } |
| |
| // Corrupt the data: |
| final File smooshFile = new File( |
| StringUtils.format( |
| "%s/persistent/task/%s/work/persist/%s/%s_%s/0/00000.smoosh", |
| directory, |
| task1.getId(), |
| task1.getDataSource(), |
| Granularities.DAY.bucketStart(now), |
| Granularities.DAY.bucketEnd(now) |
| ) |
| ); |
| |
| Files.write(smooshFile.toPath(), StringUtils.toUtf8("oops!")); |
| |
| // Second run: |
| { |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); |
| final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox); |
| |
| // Wait for the task to finish. |
| boolean caught = false; |
| try { |
| statusFuture.get(); |
| } |
| catch (Exception e) { |
| caught = true; |
| } |
| Assert.assertTrue("expected exception", caught); |
| } |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testStopBeforeStarting() throws Exception |
| { |
| final File directory = tempFolder.newFolder(); |
| final RealtimeIndexTask task1 = makeRealtimeTask(null); |
| |
| final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); |
| final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); |
| task1.stopGracefully(taskToolbox.getConfig()); |
| final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox); |
| |
| // Wait for the task to finish. |
| final TaskStatus taskStatus = statusFuture.get(); |
| Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); |
| } |
| |
| private ListenableFuture<TaskStatus> runTask(final Task task, final TaskToolbox toolbox) |
| { |
| return taskExec.submit( |
| new Callable<TaskStatus>() |
| { |
| @Override |
| public TaskStatus call() throws Exception |
| { |
| try { |
| if (task.isReady(toolbox.getTaskActionClient())) { |
| return task.run(toolbox); |
| } else { |
| throw new ISE("Task is not ready"); |
| } |
| } |
| catch (Exception e) { |
| log.warn(e, "Task failed"); |
| throw e; |
| } |
| } |
| } |
| ); |
| } |
| |
| private RealtimeIndexTask makeRealtimeTask(final String taskId) |
| { |
| return makeRealtimeTask(taskId, TransformSpec.NONE, true, 0); |
| } |
| |
| private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions) |
| { |
| return makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0); |
| } |
| |
| private RealtimeIndexTask makeRealtimeTask( |
| final String taskId, |
| final TransformSpec transformSpec, |
| final boolean reportParseExceptions, |
| final long handoffTimeout |
| ) |
| { |
| ObjectMapper objectMapper = new DefaultObjectMapper(); |
| DataSchema dataSchema = new DataSchema( |
| "test_ds", |
| TestHelper.makeJsonMapper().convertValue( |
| new MapInputRowParser( |
| new TimeAndDimsParseSpec( |
| new TimestampSpec("t", "auto", null), |
| new DimensionsSpec( |
| DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim1t")), |
| null, |
| null |
| ) |
| ) |
| ), |
| JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT |
| ), |
| new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, |
| new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), |
| transformSpec, |
| objectMapper |
| ); |
| RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( |
| new TestFirehose.TestFirehoseFactory(), |
| null |
| ); |
| RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( |
| 1000, |
| null, |
| new Period("P1Y"), |
| new Period("PT10M"), |
| null, |
| null, |
| new ServerTimeRejectionPolicyFactory(), |
| null, |
| null, |
| null, |
| null, |
| true, |
| 0, |
| 0, |
| reportParseExceptions, |
| handoffTimeout, |
| null, |
| null, |
| null |
| ); |
| return new RealtimeIndexTask( |
| taskId, |
| null, |
| new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig), |
| null |
| ) |
| { |
| @Override |
| protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) |
| { |
| return true; |
| } |
| }; |
| } |
| |
| private TaskToolbox makeToolbox( |
| final Task task, |
| final IndexerMetadataStorageCoordinator mdc, |
| final File directory |
| ) |
| { |
| return makeToolbox( |
| task, |
| new HeapMemoryTaskStorage(new TaskStorageConfig(null)), |
| mdc, |
| directory |
| ); |
| } |
| |
| private TaskToolbox makeToolbox( |
| final Task task, |
| final TaskStorage taskStorage, |
| final IndexerMetadataStorageCoordinator mdc, |
| final File directory |
| ) |
| { |
| final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null); |
| final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc); |
| try { |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| } |
| catch (EntryExistsException e) { |
| // suppress |
| } |
| taskLockbox.syncFromStorage(); |
| final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( |
| taskLockbox, |
| taskStorage, |
| mdc, |
| EMITTER, |
| EasyMock.createMock(SupervisorManager.class) |
| ); |
| final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( |
| taskStorage, |
| taskActionToolbox, |
| new TaskAuditLogConfig(false) |
| ); |
| final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( |
| ImmutableMap.of( |
| TimeseriesQuery.class, |
| new TimeseriesQueryRunnerFactory( |
| new TimeseriesQueryQueryToolChest(), |
| new TimeseriesQueryEngine(), |
| new QueryWatcher() |
| { |
| @Override |
| public void registerQueryFuture(Query query, ListenableFuture future) |
| { |
| // do nothing |
| } |
| } |
| ) |
| ) |
| ); |
| handOffCallbacks = new ConcurrentHashMap<>(); |
| final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory() |
| { |
| @Override |
| public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) |
| { |
| return new SegmentHandoffNotifier() |
| { |
| @Override |
| public boolean registerSegmentHandoffCallback( |
| SegmentDescriptor descriptor, |
| Executor exec, |
| Runnable handOffRunnable |
| ) |
| { |
| handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); |
| return true; |
| } |
| |
| @Override |
| public void start() |
| { |
| //Noop |
| } |
| |
| @Override |
| public void close() |
| { |
| //Noop |
| } |
| |
| }; |
| } |
| }; |
| final TestUtils testUtils = new TestUtils(); |
| final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( |
| taskConfig, |
| null, // taskExecutorNode |
| taskActionClientFactory, |
| EMITTER, |
| new TestDataSegmentPusher(), |
| new TestDataSegmentKiller(), |
| null, // DataSegmentMover |
| null, // DataSegmentArchiver |
| new TestDataSegmentAnnouncer(), |
| EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), |
| handoffNotifierFactory, |
| () -> conglomerate, |
| Execs.directExecutor(), // queryExecutorService |
| NoopJoinableFactory.INSTANCE, |
| () -> EasyMock.createMock(MonitorScheduler.class), |
| new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), |
| testUtils.getTestObjectMapper(), |
| testUtils.getTestIndexIO(), |
| MapCache.create(1024), |
| new CacheConfig(), |
| new CachePopulatorStats(), |
| testUtils.getTestIndexMergerV9(), |
| EasyMock.createNiceMock(DruidNodeAnnouncer.class), |
| EasyMock.createNiceMock(DruidNode.class), |
| new LookupNodeService("tier"), |
| new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), |
| new NoopTestTaskReportFileWriter(), |
| null, |
| AuthTestUtils.TEST_AUTHORIZER_MAPPER, |
| new NoopChatHandlerProvider(), |
| testUtils.getRowIngestionMetersFactory(), |
| new TestAppenderatorsManager(), |
| new NoopIndexingServiceClient(), |
| null, |
| null, |
| null |
| ); |
| |
| return toolboxFactory.build(task); |
| } |
| |
| @Nullable |
| public Long sumMetric(final Task task, final DimFilter filter, final String metric) |
| { |
| // Do a query. |
| TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() |
| .dataSource("test_ds") |
| .filters(filter) |
| .aggregators( |
| ImmutableList.of( |
| new LongSumAggregatorFactory(metric, metric) |
| ) |
| ).granularity(Granularities.ALL) |
| .intervals("2000/3000") |
| .build(); |
| |
| List<Result<TimeseriesResultValue>> results = task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList(); |
| if (results.isEmpty()) { |
| return 0L; |
| } else { |
| return results.get(0).getValue().getLongMetric(metric); |
| } |
| } |
| } |