| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.kinesis.supervisor; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Optional; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.druid.data.input.InputFormat; |
| import org.apache.druid.data.input.impl.DimensionSchema; |
| import org.apache.druid.data.input.impl.DimensionsSpec; |
| import org.apache.druid.data.input.impl.JsonInputFormat; |
| import org.apache.druid.data.input.impl.StringDimensionSchema; |
| import org.apache.druid.data.input.impl.TimestampSpec; |
| import org.apache.druid.indexer.TaskLocation; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.TaskInfoProvider; |
| import org.apache.druid.indexing.common.TestUtils; |
| import org.apache.druid.indexing.common.task.RealtimeIndexTask; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; |
| import org.apache.druid.indexing.kinesis.KinesisIndexTask; |
| import org.apache.druid.indexing.kinesis.KinesisIndexTaskClient; |
| import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; |
| import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; |
| import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig; |
| import org.apache.druid.indexing.kinesis.KinesisRecordSupplier; |
| import org.apache.druid.indexing.kinesis.KinesisSequenceNumber; |
| import org.apache.druid.indexing.overlord.DataSourceMetadata; |
| import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; |
| import org.apache.druid.indexing.overlord.TaskMaster; |
| import org.apache.druid.indexing.overlord.TaskQueue; |
| import org.apache.druid.indexing.overlord.TaskRunner; |
| import org.apache.druid.indexing.overlord.TaskRunnerListener; |
| import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; |
| import org.apache.druid.indexing.overlord.TaskStorage; |
| import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; |
| import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; |
| import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; |
| import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; |
| import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; |
| import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; |
| import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; |
| import org.apache.druid.indexing.seekablestream.common.RecordSupplier; |
| import org.apache.druid.indexing.seekablestream.common.StreamPartition; |
| import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; |
| import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.parsers.JSONPathSpec; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.metadata.EntryExistsException; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.CountAggregatorFactory; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.incremental.RowIngestionMetersFactory; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.indexing.RealtimeIOConfig; |
| import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; |
| import org.apache.druid.segment.realtime.FireDepartment; |
| import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; |
| import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; |
| import org.apache.druid.server.metrics.NoopServiceEmitter; |
| import org.easymock.Capture; |
| import org.easymock.CaptureType; |
| import org.easymock.EasyMock; |
| import org.easymock.EasyMockSupport; |
| import org.easymock.IAnswer; |
| import org.joda.time.DateTime; |
| import org.joda.time.Duration; |
| import org.joda.time.Period; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executor; |
| |
| public class KinesisSupervisorTest extends EasyMockSupport |
| { |
| private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); |
| private static final InputFormat INPUT_FORMAT = new JsonInputFormat( |
| new JSONPathSpec(true, ImmutableList.of()), |
| ImmutableMap.of(), |
| false |
| ); |
| private static final String DATASOURCE = "testDS"; |
| private static final int TEST_CHAT_THREADS = 3; |
| private static final long TEST_CHAT_RETRIES = 9L; |
| private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S"); |
| private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S"); |
| private static final String STREAM = "stream"; |
| private static final String SHARD_ID0 = "shardId-000000000000"; |
| private static final String SHARD_ID1 = "shardId-000000000001"; |
| private static final String SHARD_ID2 = "shardId-000000000002"; |
| private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0); |
| private static final StreamPartition<String> SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1); |
| private static final StreamPartition<String> SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2); |
| private static DataSchema dataSchema; |
| private KinesisRecordSupplier supervisorRecordSupplier; |
| |
| private final int numThreads; |
| private TestableKinesisSupervisor supervisor; |
| private KinesisSupervisorTuningConfig tuningConfig; |
| private TaskStorage taskStorage; |
| private TaskMaster taskMaster; |
| private TaskRunner taskRunner; |
| private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; |
| private KinesisIndexTaskClient taskClient; |
| private TaskQueue taskQueue; |
| private RowIngestionMetersFactory rowIngestionMetersFactory; |
| private ExceptionCapturingServiceEmitter serviceEmitter; |
| private SupervisorStateManagerConfig supervisorConfig; |
| |
| public KinesisSupervisorTest() |
| { |
| this.numThreads = 1; |
| } |
| |
| @BeforeClass |
| public static void setupClass() |
| { |
| dataSchema = getDataSchema(DATASOURCE); |
| } |
| |
| @Before |
| public void setupTest() |
| { |
| taskStorage = createMock(TaskStorage.class); |
| taskMaster = createMock(TaskMaster.class); |
| taskRunner = createMock(TaskRunner.class); |
| indexerMetadataStorageCoordinator = createMock(IndexerMetadataStorageCoordinator.class); |
| taskClient = createMock(KinesisIndexTaskClient.class); |
| taskQueue = createMock(TaskQueue.class); |
| supervisorRecordSupplier = createMock(KinesisRecordSupplier.class); |
| |
| tuningConfig = new KinesisSupervisorTuningConfig( |
| 1000, |
| null, |
| 50000, |
| null, |
| new Period("P1Y"), |
| new File("/test"), |
| null, |
| null, |
| null, |
| true, |
| false, |
| null, |
| null, |
| null, |
| null, |
| numThreads, |
| TEST_CHAT_THREADS, |
| TEST_CHAT_RETRIES, |
| TEST_HTTP_TIMEOUT, |
| TEST_SHUTDOWN_TIMEOUT, |
| null, |
| null, |
| null, |
| 5000, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null |
| ); |
| rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); |
| serviceEmitter = new ExceptionCapturingServiceEmitter(); |
| EmittingLogger.registerEmitter(serviceEmitter); |
| supervisorConfig = new SupervisorStateManagerConfig(); |
| } |
| |
| @After |
| public void tearDownTest() |
| { |
| supervisor = null; |
| } |
| |
| @Test |
| public void testNoInitialState() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisIndexTask task = captured.getValue(); |
| Assert.assertEquals(dataSchema, task.getDataSchema()); |
| Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), task.getTuningConfig()); |
| |
| KinesisIndexTaskIOConfig taskConfig = task.getIOConfig(); |
| Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); |
| Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); |
| Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); |
| Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); |
| |
| Assert.assertEquals(STREAM, taskConfig.getStartSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| "0", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| "0", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| |
| Assert.assertEquals(STREAM, taskConfig.getEndSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| } |
| |
| @Test |
| public void testRecordSupplier() |
| { |
| KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( |
| STREAM, |
| INPUT_FORMAT, |
| "awsEndpoint", |
| null, |
| 1, |
| 1, |
| new Period("PT30M"), |
| new Period("P1D"), |
| new Period("PT30S"), |
| false, |
| new Period("PT30M"), |
| null, |
| null, |
| null, |
| 100, |
| 1000, |
| null, |
| null, |
| false |
| ); |
| KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); |
| KinesisSupervisor supervisor = new KinesisSupervisor( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| clientFactory, |
| OBJECT_MAPPER, |
| new KinesisSupervisorSpec( |
| null, |
| dataSchema, |
| tuningConfig, |
| kinesisSupervisorIOConfig, |
| null, |
| false, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| clientFactory, |
| OBJECT_MAPPER, |
| new NoopServiceEmitter(), |
| new DruidMonitorSchedulerConfig(), |
| rowIngestionMetersFactory, |
| null, |
| new SupervisorStateManagerConfig() |
| ), |
| rowIngestionMetersFactory, |
| null |
| ); |
| |
| KinesisRecordSupplier supplier = (KinesisRecordSupplier) supervisor.setupRecordSupplier(); |
| Assert.assertNotNull(supplier); |
| Assert.assertEquals(0, supplier.bufferSize()); |
| Assert.assertEquals(Collections.emptySet(), supplier.getAssignment()); |
| // background fetch should not be enabled for supervisor supplier |
| supplier.start(); |
| Assert.assertFalse(supplier.isBackgroundFetchRunning()); |
| } |
| |
| @Test |
| public void testMultiTask() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock |
| .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) |
| .andReturn(new KinesisDataSourceMetadata(null)) |
| .anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisIndexTask task1 = captured.getValues().get(0); |
| Assert.assertEquals(1, task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals(1, task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals( |
| "0", |
| task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| |
| KinesisIndexTask task2 = captured.getValues().get(1); |
| Assert.assertEquals(1, task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals(1, task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals( |
| "0", |
| task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| } |
| |
| @Test |
| public void testReplicas() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisIndexTask task1 = captured.getValues().get(0); |
| Assert.assertEquals(2, task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals(2, task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals( |
| "0", |
| task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| Assert.assertEquals( |
| "0", |
| task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| |
| KinesisIndexTask task2 = captured.getValues().get(1); |
| Assert.assertEquals(2, task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals(2, task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); |
| Assert.assertEquals( |
| "0", |
| task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| Assert.assertEquals( |
| "0", |
| task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| |
| } |
| |
| @Test |
| public void testLateMessageRejectionPeriod() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisIndexTask task1 = captured.getValues().get(0); |
| KinesisIndexTask task2 = captured.getValues().get(1); |
| |
| Assert.assertTrue( |
| "minimumMessageTime", |
| task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow() |
| ); |
| Assert.assertTrue( |
| "minimumMessageTime", |
| task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow() |
| ); |
| Assert.assertEquals( |
| task1.getIOConfig().getMinimumMessageTime().get(), |
| task2.getIOConfig().getMinimumMessageTime().get() |
| ); |
| } |
| |
| @Test |
| public void testEarlyMessageRejectionPeriod() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisIndexTask task1 = captured.getValues().get(0); |
| KinesisIndexTask task2 = captured.getValues().get(1); |
| |
| Assert.assertTrue( |
| "maximumMessageTime", |
| task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 + 60).isAfterNow() |
| ); |
| Assert.assertTrue( |
| "maximumMessageTime", |
| task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 + 60).isBeforeNow() |
| ); |
| Assert.assertEquals( |
| task1.getIOConfig().getMaximumMessageTime().get(), |
| task2.getIOConfig().getMaximumMessageTime().get() |
| ); |
| } |
| |
| |
| /** |
| * Test generating the starting sequences from the partition data stored in druid_dataSource which contains the |
| * sequences of the last built segments. |
| */ |
| @Test |
| public void testDatasourceMetadata() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(SHARD1_PARTITION)).andReturn("2").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of(SHARD_ID1, "2", SHARD_ID0, "1"), |
| ImmutableSet.of() |
| ) |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisIndexTask task = captured.getValue(); |
| KinesisIndexTaskIOConfig taskConfig = task.getIOConfig(); |
| Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); |
| Assert.assertEquals( |
| "2", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| "1", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| } |
| |
| @Test |
| public void testBadMetadataOffsets() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of(SHARD_ID1, "101", SHARD_ID0, "-1"), |
| ImmutableSet.of() |
| ) |
| ) |
| ).anyTimes(); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| |
| Assert.assertEquals( |
| "org.apache.druid.java.util.common.ISE", |
| supervisor.getStateManager().getExceptionEvents().get(0).getExceptionClass() |
| ); |
| } |
| |
| @Test |
| public void testDontKillTasksWithMismatchedType() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| // non KinesisIndexTask (don't kill) |
| Task id2 = new RealtimeIndexTask( |
| "id2", |
| null, |
| new FireDepartment( |
| dataSchema, |
| new RealtimeIOConfig(null, null), |
| null |
| ), |
| null |
| ); |
| |
| List<Task> existingTasks = ImmutableList.of(id2); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2); |
| |
| replayAll(); |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testKillBadPartitionAssignment() throws Exception |
| { |
| supervisor = getTestableSupervisor( |
| 1, |
| 2, |
| true, |
| "PT1H", |
| null, |
| null |
| ); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of()), |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "12")), |
| null, |
| null |
| ); |
| Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 1, |
| new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "0"), ImmutableSet.of(SHARD_ID0)), |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1")), |
| null, |
| null |
| ); |
| Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of(SHARD_ID0, "0", SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1", SHARD_ID1, "12")), |
| null, |
| null |
| ); |
| Task id4 = createKinesisIndexTask( |
| "id4", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "0"), ImmutableSet.of(SHARD_ID0)), |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1")), |
| null, |
| null |
| ); |
| |
| List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id4")).andReturn(Optional.of(TaskStatus.running("id4"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id4")).andReturn(Optional.of(id4)).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true)); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of(SHARD_ID1, "0")); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0")); |
| TreeMap<Integer, Map<String, String>> checkpoints4 = new TreeMap<>(); |
| checkpoints4.put(0, ImmutableMap.of(SHARD_ID0, "0")); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id4"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints4)) |
| .times(1); |
| |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| taskQueue.shutdown("id4", "Task [%s] failed to return status, killing task", "id4"); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testRequeueTaskWhenFailed() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put( |
| 0, |
| ImmutableMap.of( |
| SHARD_ID1, |
| "0" |
| ) |
| ); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of( |
| SHARD_ID0, |
| "0" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .anyTimes(); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| |
| // test that running the main loop again checks the status of the tasks that were created and does nothing if they |
| // are all still running |
| EasyMock.reset(taskStorage); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| // test that a task failing causes a new task to be re-queued with the same parameters |
| Capture<Task> aNewTaskCapture = Capture.newInstance(); |
| List<Task> imStillAlive = tasks.subList(0, 3); |
| KinesisIndexTask iHaveFailed = (KinesisIndexTask) tasks.get(3); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillAlive).anyTimes(); |
| for (Task task : imStillAlive) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) |
| .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); |
| EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true); |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskQueue); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| Assert.assertNotEquals(iHaveFailed.getId(), aNewTaskCapture.getValue().getId()); |
| Assert.assertEquals( |
| iHaveFailed.getIOConfig().getBaseSequenceName(), |
| ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getBaseSequenceName() |
| ); |
| } |
| |
| @Test |
| public void testRequeueAdoptedTaskWhenFailed() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| DateTime now = DateTimes.nowUtc(); |
| DateTime maxi = now.plusMinutes(60); |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| now, |
| maxi |
| ); |
| |
| List<Task> existingTasks = ImmutableList.of(id1); |
| |
| Capture<Task> captured = Capture.newInstance(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(now)).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0", |
| SHARD_ID0, |
| "0" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(2); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| // check that replica tasks are created with the same minimumMessageTime as tasks inherited from another supervisor |
| Assert.assertEquals(now, ((KinesisIndexTask) captured.getValue()).getIOConfig().getMinimumMessageTime().get()); |
| |
| // test that a task failing causes a new task to be re-queued with the same parameters |
| String runningTaskId = captured.getValue().getId(); |
| Capture<Task> aNewTaskCapture = Capture.newInstance(); |
| KinesisIndexTask iHaveFailed = (KinesisIndexTask) existingTasks.get(0); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.reset(taskClient); |
| |
| // for the newly created replica task |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(2); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) |
| .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); |
| EasyMock.expect(taskStorage.getStatus(runningTaskId)) |
| .andReturn(Optional.of(TaskStatus.running(runningTaskId))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask(runningTaskId)).andReturn(Optional.of(captured.getValue())).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(runningTaskId)) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync(runningTaskId)).andReturn(Futures.immediateFuture(now)).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true); |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskQueue); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| Assert.assertNotEquals(iHaveFailed.getId(), aNewTaskCapture.getValue().getId()); |
| Assert.assertEquals( |
| iHaveFailed.getIOConfig().getBaseSequenceName(), |
| ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getBaseSequenceName() |
| ); |
| |
| // check that failed tasks are recreated with the same minimumMessageTime as the task it replaced, even if that |
| // task came from another supervisor |
| Assert.assertEquals( |
| now, |
| ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get() |
| ); |
| Assert.assertEquals( |
| maxi, |
| ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get() |
| ); |
| } |
| |
| @Test |
| public void testQueueNextTasksOnSuccess() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0", |
| SHARD_ID0, |
| "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0" |
| )); |
| // there would be 4 tasks, 2 for each task group |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(2); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .times(2); |
| |
| |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| // test that a task succeeding causes a new task to be re-queued with the next stream range and causes any replica |
| // tasks to be shutdown |
| Capture<Task> newTasksCapture = Capture.newInstance(CaptureType.ALL); |
| Capture<String> shutdownTaskIdCapture = Capture.newInstance(); |
| List<Task> imStillRunning = tasks.subList(1, 4); |
| KinesisIndexTask iAmSuccess = (KinesisIndexTask) tasks.get(0); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.reset(taskClient); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillRunning).anyTimes(); |
| for (Task task : imStillRunning) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.expect(taskStorage.getStatus(iAmSuccess.getId())) |
| .andReturn(Optional.of(TaskStatus.success(iAmSuccess.getId()))); |
| EasyMock.expect(taskStorage.getTask(iAmSuccess.getId())).andReturn(Optional.of(iAmSuccess)).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(newTasksCapture))).andReturn(true).times(2); |
| EasyMock.expect(taskClient.stopAsync(EasyMock.capture(shutdownTaskIdCapture), EasyMock.eq(false))) |
| .andReturn(Futures.immediateFuture(true)); |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskQueue); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| // make sure we killed the right task (sequenceName for replicas are the same) |
| Assert.assertTrue(shutdownTaskIdCapture.getValue().contains(iAmSuccess.getIOConfig().getBaseSequenceName())); |
| } |
| |
| @Test |
| public void testBeginPublishAndQueueNextTasks() throws Exception |
| { |
| final TaskLocation location = new TaskLocation("testHost", 1234, -1); |
| |
| supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| final Capture<Task> firstTasks = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata(null) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(firstTasks))).andReturn(true).times(4); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| final List<Task> tasks = firstTasks.getValues(); |
| Collection workItems = new ArrayList<>(); |
| for (Task task : tasks) { |
| workItems.add(new TestTaskRunnerWorkItem(task, null, location)); |
| } |
| |
| EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue); |
| final Capture<Task> secondTasks = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0"))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc().minusMinutes(2))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1"))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .times(2); |
| EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0"))) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "1" |
| ))) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| ))); |
| EasyMock.expect( |
| taskClient.setEndOffsetsAsync( |
| EasyMock.contains("sequenceName-0"), |
| EasyMock.eq(ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| )), |
| EasyMock.eq(true) |
| ) |
| ).andReturn(Futures.immediateFuture(true)).times(2); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(secondTasks))).andReturn(true).times(2); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of( |
| SHARD_ID0, |
| "0" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(2); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .times(2); |
| |
| EasyMock.replay(taskStorage, taskRunner, taskClient, taskQueue); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| for (Task task : secondTasks.getValues()) { |
| KinesisIndexTask kinesisIndexTask = (KinesisIndexTask) task; |
| Assert.assertEquals(dataSchema, kinesisIndexTask.getDataSchema()); |
| Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), kinesisIndexTask.getTuningConfig()); |
| |
| KinesisIndexTaskIOConfig taskConfig = kinesisIndexTask.getIOConfig(); |
| Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); |
| Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); |
| |
| Assert.assertEquals(STREAM, taskConfig.getStartSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| "3", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| // start sequenceNumbers should be exclusive for the second batch of tasks |
| Assert.assertEquals( |
| ImmutableSet.of(SHARD_ID1), |
| ((KinesisIndexTask) task).getIOConfig().getStartSequenceNumbers().getExclusivePartitions() |
| ); |
| } |
| } |
| |
| @Test |
| public void testDiscoverExistingPublishingTask() throws Exception |
| { |
| final TaskLocation location = new TaskLocation("testHost", 1234, -1); |
| final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L); |
| |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) |
| .andReturn(timeLag) |
| .atLeastOnce(); |
| |
| Task task = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(task, null, location)); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING)); |
| EasyMock.expect(taskClient.getCurrentOffsetsAsync("id1", false)) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ))); |
| EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| )); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0", |
| SHARD_ID0, |
| "0" |
| )); |
| EasyMock.expect(taskClient.getCheckpoints(EasyMock.anyString(), EasyMock.anyBoolean())) |
| .andReturn(checkpoints) |
| .anyTimes(); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| supervisor.updateCurrentAndLatestOffsets(); |
| SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus(); |
| verifyAll(); |
| |
| Assert.assertEquals(DATASOURCE, report.getId()); |
| |
| KinesisSupervisorReportPayload payload = report.getPayload(); |
| |
| Assert.assertEquals(DATASOURCE, payload.getDataSource()); |
| Assert.assertEquals(3600L, payload.getDurationSeconds()); |
| Assert.assertEquals(2, payload.getPartitions()); |
| Assert.assertEquals(1, payload.getReplicas()); |
| Assert.assertEquals(STREAM, payload.getStream()); |
| Assert.assertEquals(0, payload.getActiveTasks().size()); |
| Assert.assertEquals(1, payload.getPublishingTasks().size()); |
| Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); |
| Assert.assertEquals(0, payload.getRecentErrors().size()); |
| |
| TaskReportData publishingReport = payload.getPublishingTasks().get(0); |
| |
| Assert.assertEquals("id1", publishingReport.getId()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "0", |
| SHARD_ID0, |
| "0" |
| ), publishingReport.getStartingOffsets()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ), publishingReport.getCurrentOffsets()); |
| |
| KinesisIndexTask capturedTask = captured.getValue(); |
| Assert.assertEquals(dataSchema, capturedTask.getDataSchema()); |
| Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), capturedTask.getTuningConfig()); |
| |
| KinesisIndexTaskIOConfig capturedTaskConfig = capturedTask.getIOConfig(); |
| Assert.assertEquals("awsEndpoint", capturedTaskConfig.getEndpoint()); |
| Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); |
| Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); |
| |
| // check that the new task was created with starting sequences matching where the publishing task finished |
| Assert.assertEquals(STREAM, capturedTaskConfig.getStartSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| "2", |
| capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| "1", |
| capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| |
| Assert.assertEquals(STREAM, capturedTaskConfig.getEndSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| } |
| |
| @Test |
| public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception |
| { |
| final TaskLocation location = new TaskLocation("testHost", 1234, -1); |
| final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); |
| |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) |
| .andReturn(timeLag) |
| .atLeastOnce(); |
| |
| Task task = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(task, null, location)); |
| |
| Capture<KinesisIndexTask> captured = Capture.newInstance(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING)); |
| EasyMock.expect(taskClient.getCurrentOffsetsAsync("id1", false)) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ))); |
| EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| )); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| supervisor.updateCurrentAndLatestOffsets(); |
| SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus(); |
| verifyAll(); |
| |
| Assert.assertEquals(DATASOURCE, report.getId()); |
| |
| KinesisSupervisorReportPayload payload = report.getPayload(); |
| |
| Assert.assertEquals(DATASOURCE, payload.getDataSource()); |
| Assert.assertEquals(3600L, payload.getDurationSeconds()); |
| Assert.assertEquals(2, payload.getPartitions()); |
| Assert.assertEquals(1, payload.getReplicas()); |
| Assert.assertEquals(STREAM, payload.getStream()); |
| Assert.assertEquals(0, payload.getActiveTasks().size()); |
| Assert.assertEquals(1, payload.getPublishingTasks().size()); |
| Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); |
| Assert.assertEquals(0, payload.getRecentErrors().size()); |
| |
| TaskReportData publishingReport = payload.getPublishingTasks().get(0); |
| |
| Assert.assertEquals("id1", publishingReport.getId()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "0", |
| SHARD_ID0, |
| "0" |
| ), publishingReport.getStartingOffsets()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ), publishingReport.getCurrentOffsets()); |
| |
| KinesisIndexTask capturedTask = captured.getValue(); |
| Assert.assertEquals(dataSchema, capturedTask.getDataSchema()); |
| Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), capturedTask.getTuningConfig()); |
| |
| KinesisIndexTaskIOConfig capturedTaskConfig = capturedTask.getIOConfig(); |
| Assert.assertEquals("awsEndpoint", capturedTaskConfig.getEndpoint()); |
| Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); |
| Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); |
| |
| // check that the new task was created with starting sequences matching where the publishing task finished |
| Assert.assertEquals(STREAM, capturedTaskConfig.getStartSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| "2", |
| capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| "1", |
| capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| |
| Assert.assertEquals(STREAM, capturedTaskConfig.getEndSequenceNumbers().getStream()); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertEquals( |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| } |
| |
| @Test |
| public void testDiscoverExistingPublishingAndReadingTask() throws Exception |
| { |
| final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); |
| final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); |
| final DateTime startTime = DateTimes.nowUtc(); |
| final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L); |
| |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) |
| .andReturn(timeLag) |
| .atLeastOnce(); |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "2", SHARD_ID0, "1"), ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); |
| workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING)); |
| EasyMock.expect(taskClient.getStatusAsync("id2")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getCurrentOffsetsAsync("id1", false)) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ))); |
| EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| )); |
| EasyMock.expect(taskClient.getCurrentOffsetsAsync("id2", false)) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "12", |
| SHARD_ID0, |
| "1" |
| ))); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| // since id1 is publishing, so getCheckpoints wouldn't be called for it |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| supervisor.updateCurrentAndLatestOffsets(); |
| SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus(); |
| verifyAll(); |
| |
| Assert.assertEquals(DATASOURCE, report.getId()); |
| |
| KinesisSupervisorReportPayload payload = report.getPayload(); |
| |
| Assert.assertEquals(DATASOURCE, payload.getDataSource()); |
| Assert.assertEquals(3600L, payload.getDurationSeconds()); |
| Assert.assertEquals(2, payload.getPartitions()); |
| Assert.assertEquals(1, payload.getReplicas()); |
| Assert.assertEquals(STREAM, payload.getStream()); |
| Assert.assertEquals(1, payload.getActiveTasks().size()); |
| Assert.assertEquals(1, payload.getPublishingTasks().size()); |
| Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); |
| Assert.assertEquals(0, payload.getRecentErrors().size()); |
| |
| TaskReportData activeReport = payload.getActiveTasks().get(0); |
| TaskReportData publishingReport = payload.getPublishingTasks().get(0); |
| |
| Assert.assertEquals("id2", activeReport.getId()); |
| Assert.assertEquals(startTime, activeReport.getStartTime()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ), activeReport.getStartingOffsets()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "12", |
| SHARD_ID0, |
| "1" |
| ), activeReport.getCurrentOffsets()); |
| Assert.assertEquals(timeLag, activeReport.getLagMillis()); |
| |
| Assert.assertEquals("id1", publishingReport.getId()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "0", |
| SHARD_ID0, |
| "0" |
| ), publishingReport.getStartingOffsets()); |
| Assert.assertEquals(ImmutableMap.of( |
| SHARD_ID1, |
| "2", |
| SHARD_ID0, |
| "1" |
| ), publishingReport.getCurrentOffsets()); |
| } |
| |
| @Test |
| public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception |
| { |
| supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| |
| EasyMock.reset(taskStorage, taskClient, taskQueue); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| |
| "0", |
| |
| SHARD_ID0, |
| |
| "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0")); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(2); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .times(2); |
| |
| |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(task.getId())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)); |
| EasyMock.expect(taskClient.getStartTimeAsync(task.getId())) |
| .andReturn(Futures.immediateFailedFuture(new RuntimeException())); |
| taskQueue.shutdown(task.getId(), "Task [%s] failed to return start time, killing task", task.getId()); |
| } |
| EasyMock.replay(taskStorage, taskClient, taskQueue); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testKillUnresponsiveTasksWhilePausing() throws Exception |
| { |
| final TaskLocation location = new TaskLocation("testHost", 1234, -1); |
| |
| supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| Collection workItems = new ArrayList<>(); |
| for (Task task : tasks) { |
| workItems.add(new TestTaskRunnerWorkItem(task, null, location)); |
| } |
| |
| EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0")); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(2); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .times(2); |
| |
| captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0"))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc().minusMinutes(2))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1"))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .times(2); |
| EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0"))) |
| .andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); |
| taskQueue.shutdown( |
| EasyMock.contains("sequenceName-0"), |
| EasyMock.eq("An exception occured while waiting for task [%s] to pause: [%s]"), |
| EasyMock.contains("sequenceName-0"), |
| EasyMock.anyString() |
| ); |
| EasyMock.expectLastCall().times(2); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| |
| EasyMock.replay(taskStorage, taskRunner, taskClient, taskQueue); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| for (Task task : captured.getValues()) { |
| KinesisIndexTaskIOConfig taskConfig = ((KinesisIndexTask) task).getIOConfig(); |
| Assert.assertEquals( |
| "0", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| Assert.assertNull( |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) |
| ); |
| } |
| } |
| |
| @Test |
| public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception |
| { |
| final TaskLocation location = new TaskLocation("testHost", 1234, -1); |
| |
| supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| Collection workItems = new ArrayList<>(); |
| for (Task task : tasks) { |
| workItems.add(new TestTaskRunnerWorkItem(task, null, location)); |
| } |
| |
| EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); |
| checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0")); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(2); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints2)) |
| .times(2); |
| |
| captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0"))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc().minusMinutes(2))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1"))) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .times(2); |
| EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0"))) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "1" |
| ))) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| ))); |
| EasyMock.expect( |
| taskClient.setEndOffsetsAsync( |
| EasyMock.contains("sequenceName-0"), |
| EasyMock.eq(ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| )), |
| EasyMock.eq(true) |
| ) |
| ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); |
| taskQueue.shutdown( |
| EasyMock.contains("sequenceName-0"), |
| EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task"), |
| EasyMock.contains("sequenceName-0") |
| ); |
| EasyMock.expectLastCall().times(2); |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| |
| EasyMock.replay(taskStorage, taskRunner, taskClient, taskQueue); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| for (Task task : captured.getValues()) { |
| KinesisIndexTaskIOConfig taskConfig = ((KinesisIndexTask) task).getIOConfig(); |
| Assert.assertEquals( |
| "0", |
| taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1) |
| ); |
| } |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testStopNotStarted() |
| { |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| supervisor.stop(false); |
| } |
| |
| @Test |
| public void testStop() |
| { |
| supervisorRecordSupplier.close(); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| taskClient.close(); |
| taskRunner.unregisterListener(StringUtils.format("KinesisSupervisor-%s", DATASOURCE)); |
| replayAll(); |
| |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| supervisor.start(); |
| supervisor.stop(false); |
| |
| verifyAll(); |
| } |
| |
| @Test |
| public void testStopGracefully() throws Exception |
| { |
| final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); |
| final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); |
| final DateTime startTime = DateTimes.nowUtc(); |
| |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| "3", |
| SHARD_ID0, |
| "1" |
| ), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); |
| workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING)); |
| EasyMock.expect(taskClient.getStatusAsync("id2")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id3")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of( |
| SHARD_ID1, |
| "3", |
| SHARD_ID0, |
| "1" |
| )); |
| |
| // getCheckpoints will not be called for id1 as it is in publishing state |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "3", |
| SHARD_ID0, |
| "1" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskRunner, taskClient, taskQueue); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskClient.pauseAsync("id2")) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "12", |
| SHARD_ID0, |
| "1" |
| ))); |
| EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of( |
| SHARD_ID1, |
| "12", |
| SHARD_ID0, |
| "1" |
| ), true)) |
| .andReturn(Futures.immediateFuture(true)); |
| taskQueue.shutdown("id3", "Killing task for graceful shutdown"); |
| EasyMock.expectLastCall().times(1); |
| taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3"); |
| EasyMock.expectLastCall().times(1); |
| |
| EasyMock.replay(taskRunner, taskClient, taskQueue); |
| |
| supervisor.gracefulShutdownInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testResetNoTasks() |
| { |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject())) |
| .andReturn(Collections.emptySet()) |
| .anyTimes(); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); |
| EasyMock.replay(indexerMetadataStorageCoordinator); |
| |
| supervisor.resetInternal(null); |
| verifyAll(); |
| |
| } |
| |
| @Test |
| public void testResetDataSourceMetadata() throws Exception |
| { |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject())) |
| .andReturn(Collections.emptySet()) |
| .anyTimes(); |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| Capture<String> captureDataSource = EasyMock.newCapture(); |
| Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture(); |
| |
| KinesisDataSourceMetadata kinesisDataSourceMetadata = new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ), |
| ImmutableSet.of() |
| ) |
| ); |
| |
| KinesisDataSourceMetadata resetMetadata = new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ), |
| ImmutableSet.of() |
| ) |
| ); |
| |
| KinesisDataSourceMetadata expectedMetadata = new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ), |
| ImmutableSet.of() |
| ) |
| ); |
| |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) |
| .andReturn(kinesisDataSourceMetadata); |
| EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata( |
| EasyMock.capture(captureDataSource), |
| EasyMock.capture(captureDataSourceMetadata) |
| )).andReturn(true); |
| EasyMock.replay(indexerMetadataStorageCoordinator); |
| |
| try { |
| supervisor.resetInternal(resetMetadata); |
| } |
| catch (NullPointerException npe) { |
| // Expected as there will be an attempt to EasyMock.reset partitionGroups sequences to NOT_SET |
| // however there would be no entries in the map as we have not put nay data in kafka |
| Assert.assertNull(npe.getCause()); |
| } |
| verifyAll(); |
| |
| Assert.assertEquals(captureDataSource.getValue(), DATASOURCE); |
| Assert.assertEquals(captureDataSourceMetadata.getValue(), expectedMetadata); |
| } |
| |
| @Test |
| public void testResetNoDataSourceMetadata() |
| { |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject())) |
| .andReturn(Collections.emptySet()) |
| .anyTimes(); |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| KinesisDataSourceMetadata resetMetadata = new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of(SHARD_ID0, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER), |
| ImmutableSet.of() |
| ) |
| ); |
| |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| // no DataSourceMetadata in metadata store |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null); |
| EasyMock.replay(indexerMetadataStorageCoordinator); |
| |
| supervisor.resetInternal(resetMetadata); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("300").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| // unknown DataSourceMetadata in metadata store |
| EasyMock |
| .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) |
| .andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, "200")) |
| ) |
| ) |
| .times(2); |
| |
| // Since shard 2 was in metadata before but is not in the list of shards returned by the record supplier, |
| // it gets deleted from metadata (it is an expired shard) |
| EasyMock |
| .expect( |
| indexerMetadataStorageCoordinator.resetDataSourceMetadata( |
| DATASOURCE, |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, KinesisSequenceNumber.EXPIRED_MARKER) |
| ) |
| ) |
| ) |
| ) |
| .andReturn(true) |
| .times(1); |
| |
| // getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset. |
| // Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time. |
| // Instead, subsequent partitions will be reset in the following supervisor runs. |
| EasyMock |
| .expect( |
| indexerMetadataStorageCoordinator.resetDataSourceMetadata( |
| DATASOURCE, |
| new KinesisDataSourceMetadata( |
| // Only one partition is reset in a single supervisor run. |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of()) |
| ) |
| ) |
| ) |
| .andReturn(true) |
| .times(1); |
| |
| EasyMock |
| .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) |
| .andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100")) |
| ) |
| ) |
| .times(2); |
| |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testResetRunningTasks() throws Exception |
| { |
| final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); |
| final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); |
| final DateTime startTime = DateTimes.nowUtc(); |
| |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); |
| workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING)); |
| EasyMock.expect(taskClient.getStatusAsync("id2")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id3")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of( |
| SHARD_ID1, |
| |
| "3", |
| |
| SHARD_ID0, |
| |
| "1" |
| )); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| |
| "3", |
| |
| SHARD_ID0, |
| |
| "1" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskQueue, indexerMetadataStorageCoordinator); |
| EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); |
| taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset"); |
| taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset"); |
| EasyMock.replay(taskQueue, indexerMetadataStorageCoordinator); |
| |
| supervisor.resetInternal(null); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testNoDataIngestionTasks() throws Exception |
| { |
| final DateTime startTime = DateTimes.nowUtc(); |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); |
| |
| //not adding any events |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id2")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id3")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "10", |
| SHARD_ID0, |
| "20" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskQueue, indexerMetadataStorageCoordinator); |
| EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); |
| taskQueue.shutdown("id1", "DataSourceMetadata is not found while reset"); |
| taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset"); |
| taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset"); |
| EasyMock.replay(taskQueue, indexerMetadataStorageCoordinator); |
| |
| supervisor.resetInternal(null); |
| verifyAll(); |
| } |
| |
| |
| @Test(timeout = 60_000L) |
| public void testCheckpointForInactiveTaskGroup() throws InterruptedException |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false); |
| //not adding any events |
| final KinesisIndexTask id1; |
| id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| final Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| final Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); |
| final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); |
| workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); |
| workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect( |
| indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( |
| null) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id2")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id3")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| |
| final DateTime startTime = DateTimes.nowUtc(); |
| EasyMock.expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); |
| |
| final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20")); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| |
| supervisor.moveTaskGroupToPendingCompletion(0); |
| supervisor.checkpoint( |
| 0, |
| new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), checkpoints.get(0).keySet()) |
| ) |
| ); |
| |
| while (supervisor.getNoticesQueueSize() > 0) { |
| Thread.sleep(100); |
| } |
| |
| verifyAll(); |
| |
| Assert.assertNull(serviceEmitter.getStackTrace(), serviceEmitter.getStackTrace()); |
| Assert.assertNull(serviceEmitter.getExceptionMessage(), serviceEmitter.getExceptionMessage()); |
| Assert.assertNull(serviceEmitter.getExceptionClass()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testCheckpointForUnknownTaskGroup() |
| throws InterruptedException |
| { |
| supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false); |
| |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| //not adding any events |
| final KinesisIndexTask id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| final Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| final Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect( |
| indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( |
| null) |
| ).anyTimes(); |
| |
| replayAll(); |
| |
| supervisor.start(); |
| |
| supervisor.checkpoint( |
| 0, |
| new KinesisDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.emptyMap(), ImmutableSet.of()) |
| ) |
| ); |
| |
| while (supervisor.getNoticesQueueSize() > 0) { |
| Thread.sleep(100); |
| } |
| |
| verifyAll(); |
| |
| while (serviceEmitter.getStackTrace() == null) { |
| Thread.sleep(100); |
| } |
| |
| Assert.assertTrue(serviceEmitter.getStackTrace() |
| .startsWith("org.apache.druid.java.util.common.ISE: Cannot find")); |
| Assert.assertEquals( |
| "Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]", |
| serviceEmitter.getExceptionMessage() |
| ); |
| Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); |
| } |
| |
| @Test |
| public void testSuspendedNoRunningTasks() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject())) |
| .andReturn(Collections.emptySet()) |
| .anyTimes(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| // this asserts that taskQueue.add does not in fact get called because supervisor should be suspended |
| EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andAnswer((IAnswer) () -> { |
| Assert.fail(); |
| return null; |
| }).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testSuspendedRunningTasks() throws Exception |
| { |
| // graceful shutdown is expected to be called on running tasks since state is suspended |
| |
| final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); |
| final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); |
| final DateTime startTime = DateTimes.nowUtc(); |
| |
| supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Task id1 = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id2 = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Task id3 = createKinesisIndexTask( |
| "id3", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"), |
| ImmutableSet.of(SHARD_ID0, SHARD_ID1) |
| ), |
| new SeekableStreamEndSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, |
| SHARD_ID0, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ), |
| null, |
| null |
| ); |
| |
| Collection workItems = new ArrayList<>(); |
| workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); |
| workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync("id1")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING)); |
| EasyMock.expect(taskClient.getStatusAsync("id2")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStatusAsync("id3")) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); |
| EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of( |
| SHARD_ID1, |
| "3", |
| SHARD_ID0, |
| "1" |
| )); |
| |
| // getCheckpoints will not be called for id1 as it is in publishing state |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "3", |
| SHARD_ID0, |
| "1" |
| )); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(1); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| EasyMock.expect(taskClient.pauseAsync("id2")) |
| .andReturn(Futures.immediateFuture(ImmutableMap.of( |
| SHARD_ID1, |
| "12", |
| SHARD_ID0, |
| "1" |
| ))); |
| EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of( |
| SHARD_ID1, |
| "12", |
| SHARD_ID0, |
| "1" |
| ), true)) |
| .andReturn(Futures.immediateFuture(true)); |
| taskQueue.shutdown("id3", "Killing task for graceful shutdown"); |
| EasyMock.expectLastCall().times(1); |
| taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3"); |
| EasyMock.expectLastCall().times(1); |
| |
| replayAll(); |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testResetSuspended() |
| { |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject())) |
| .andReturn(Collections.emptySet()) |
| .anyTimes(); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| replayAll(); |
| |
| supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true); |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); |
| EasyMock.replay(indexerMetadataStorageCoordinator); |
| |
| supervisor.resetInternal(null); |
| verifyAll(); |
| } |
| |
| |
| @Test |
| public void testGetCurrentTotalStats() |
| { |
| supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false); |
| supervisor.setPartitionIdsForTests( |
| ImmutableList.of(SHARD_ID0, SHARD_ID1) |
| ); |
| supervisor.addTaskGroupToActivelyReadingTaskGroup( |
| supervisor.getTaskGroupIdForPartition(SHARD_ID0), |
| ImmutableMap.of("0", "0"), |
| Optional.absent(), |
| Optional.absent(), |
| ImmutableSet.of("task1"), |
| ImmutableSet.of() |
| ); |
| |
| supervisor.addTaskGroupToPendingCompletionTaskGroup( |
| supervisor.getTaskGroupIdForPartition(SHARD_ID1), |
| ImmutableMap.of("0", "0"), |
| Optional.absent(), |
| Optional.absent(), |
| ImmutableSet.of("task2"), |
| ImmutableSet.of() |
| ); |
| |
| EasyMock.expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of( |
| "prop1", |
| "val1" |
| ))).times(1); |
| |
| EasyMock.expect(taskClient.getMovingAveragesAsync("task2")).andReturn(Futures.immediateFuture(ImmutableMap.of( |
| "prop2", |
| "val2" |
| ))).times(1); |
| |
| replayAll(); |
| |
| Map<String, Map<String, Object>> stats = supervisor.getStats(); |
| |
| verifyAll(); |
| |
| Assert.assertEquals(2, stats.size()); |
| Assert.assertEquals(ImmutableSet.of("0", "1"), stats.keySet()); |
| Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("prop1", "val1")), stats.get("0")); |
| Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1")); |
| } |
| |
| @Test |
| public void testDoNotKillCompatibleTasks() |
| throws InterruptedException, EntryExistsException |
| { |
| // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks |
| int numReplicas = 2; |
| supervisor = getTestableSupervisorCustomIsTaskCurrent( |
| numReplicas, |
| 1, |
| true, |
| "PT1H", |
| new Period("P1D"), |
| new Period("P1D"), |
| false, |
| 42, |
| 1000, |
| true |
| ); |
| |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)).anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Task task = createKinesisIndexTask( |
| "id2", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, |
| "0", |
| SHARD_ID1, |
| "0" |
| ), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of( |
| SHARD_ID0, |
| "1", |
| SHARD_ID1, |
| "12" |
| )), |
| null, |
| null |
| ); |
| |
| List<Task> existingTasks = ImmutableList.of(task); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(task)).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>(); |
| checkpoints.put(0, ImmutableMap.of( |
| SHARD_ID0, |
| "0", |
| SHARD_ID1, |
| "0" |
| )); |
| |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints)) |
| .times(numReplicas); |
| |
| replayAll(); |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testKillIncompatibleTasks() |
| throws InterruptedException, EntryExistsException |
| { |
| // This supervisor always returns false for isTaskCurrent -> it should kill its tasks |
| int numReplicas = 2; |
| supervisor = getTestableSupervisorCustomIsTaskCurrent( |
| numReplicas, |
| 1, |
| true, |
| "PT1H", |
| new Period("P1D"), |
| new Period("P1D"), |
| false, |
| 42, |
| 1000, |
| false |
| ); |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)).anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) |
| .anyTimes(); |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Task task = createKinesisIndexTask( |
| "id1", |
| DATASOURCE, |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, |
| "0", |
| SHARD_ID1, |
| "0" |
| ), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of( |
| SHARD_ID0, |
| "1", |
| SHARD_ID1, |
| "12" |
| )), |
| null, |
| null |
| ); |
| |
| List<Task> existingTasks = ImmutableList.of(task); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes(); |
| EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); |
| EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2); |
| |
| replayAll(); |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testIsTaskCurrent() |
| { |
| DateTime minMessageTime = DateTimes.nowUtc(); |
| DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); |
| |
| KinesisSupervisor supervisor = getSupervisor( |
| 1, |
| 1, |
| true, |
| "PT1H", |
| new Period("P1D"), |
| new Period("P1D"), |
| false, |
| 42, |
| 42, |
| dataSchema, |
| tuningConfig |
| ); |
| |
| supervisor.addTaskGroupToActivelyReadingTaskGroup( |
| 42, |
| ImmutableMap.of(SHARD_ID1, "3"), |
| Optional.of(minMessageTime), |
| Optional.of(maxMessageTime), |
| ImmutableSet.of("id1", "id2", "id3", "id4"), |
| ImmutableSet.of() |
| ); |
| |
| DataSchema modifiedDataSchema = getDataSchema("some other datasource"); |
| |
| KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig( |
| 1000, |
| null, |
| 50000, |
| null, |
| new Period("P1Y"), |
| new File("/test"), |
| null, |
| null, |
| null, |
| true, |
| false, |
| null, |
| null, |
| null, |
| null, |
| numThreads, |
| TEST_CHAT_THREADS, |
| TEST_CHAT_RETRIES, |
| TEST_HTTP_TIMEOUT, |
| TEST_SHUTDOWN_TIMEOUT, |
| null, |
| null, |
| null, |
| 5000, |
| null, |
| null, |
| null, |
| null, |
| 42, // This property is different from tuningConfig |
| null, |
| null, |
| null |
| ); |
| |
| KinesisIndexTask taskFromStorage = createKinesisIndexTask( |
| "id1", |
| 0, |
| new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| ), ImmutableSet.of()), |
| new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| )), |
| minMessageTime, |
| maxMessageTime, |
| dataSchema |
| ); |
| |
| KinesisIndexTask taskFromStorageMismatchedDataSchema = createKinesisIndexTask( |
| "id2", |
| 0, |
| new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| ), ImmutableSet.of()), |
| new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| )), |
| minMessageTime, |
| maxMessageTime, |
| modifiedDataSchema |
| ); |
| |
| KinesisIndexTask taskFromStorageMismatchedTuningConfig = createKinesisIndexTask( |
| "id3", |
| 0, |
| new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| "3" |
| ), ImmutableSet.of()), |
| new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| )), |
| minMessageTime, |
| maxMessageTime, |
| dataSchema, |
| modifiedTuningConfig |
| ); |
| |
| KinesisIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKinesisIndexTask( |
| "id4", |
| 0, |
| new SeekableStreamStartSequenceNumbers<>( |
| "stream", |
| ImmutableMap.of( |
| SHARD_ID1, |
| "4" // this is the mismatch |
| ), |
| ImmutableSet.of() |
| ), |
| new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of( |
| SHARD_ID1, |
| KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| )), |
| minMessageTime, |
| maxMessageTime, |
| dataSchema |
| ); |
| |
| EasyMock.expect(taskStorage.getTask("id1")) |
| .andReturn(Optional.of(taskFromStorage)) |
| .once(); |
| EasyMock.expect(taskStorage.getTask("id2")) |
| .andReturn(Optional.of(taskFromStorageMismatchedDataSchema)) |
| .once(); |
| EasyMock.expect(taskStorage.getTask("id3")) |
| .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig)) |
| .once(); |
| EasyMock.expect(taskStorage.getTask("id4")) |
| .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup)) |
| .once(); |
| |
| replayAll(); |
| |
| Assert.assertTrue(supervisor.isTaskCurrent(42, "id1")); |
| Assert.assertFalse(supervisor.isTaskCurrent(42, "id2")); |
| Assert.assertFalse(supervisor.isTaskCurrent(42, "id3")); |
| Assert.assertFalse(supervisor.isTaskCurrent(42, "id4")); |
| verifyAll(); |
| } |
| |
| @Test |
| public void testShardSplit() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| List<Task> phaseOneTasks = testShardSplitPhaseOne(); |
| |
| List<Task> phaseTwoTasks = testShardSplitPhaseTwo(phaseOneTasks); |
| |
| testShardSplitPhaseThree(phaseTwoTasks); |
| } |
| |
| private List<Task> testShardSplitPhaseOne() throws Exception |
| { |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID0)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD0_PARTITION)) |
| .anyTimes(); |
| |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(1); |
| |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID0, |
| "0" |
| )); |
| // there would be 1 task, since there is only 1 shard |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(1); |
| |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| return tasks; |
| } |
| |
| /** |
| * Test task creation after a shard split with a closed shard |
| * |
| * @param phaseOneTasks List of tasks from the initial phase where only one shard was present |
| */ |
| private List<Task> testShardSplitPhaseTwo(List<Task> phaseOneTasks) throws Exception |
| { |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.reset(taskClient); |
| EasyMock.reset(taskMaster); |
| EasyMock.reset(taskRunner); |
| EasyMock.reset(supervisorRecordSupplier); |
| |
| // first task ran, its shard 0 has reached EOS |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<String, String>( |
| STREAM, |
| ImmutableMap.of(SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER) |
| ) |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID2)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD2_PARTITION)) |
| .anyTimes(); |
| |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID0))) |
| .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1))) |
| .andReturn("100").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) |
| .andReturn("100").anyTimes(); |
| |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| Task successfulTask = phaseOneTasks.get(0); |
| EasyMock.expect(taskStorage.getStatus(successfulTask.getId())) |
| .andReturn(Optional.of(TaskStatus.success(successfulTask.getId()))); |
| EasyMock.expect(taskStorage.getTask(successfulTask.getId())).andReturn(Optional.of(successfulTask)).anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(postSplitCaptured))).andReturn(true).times(2); |
| |
| replayAll(); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>(); |
| checkpointsGroup0.put(0, ImmutableMap.of( |
| SHARD_ID1, "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>(); |
| checkpointsGroup1.put(1, ImmutableMap.of( |
| SHARD_ID2, "0" |
| )); |
| // there would be 2 tasks, 1 for each task group |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpointsGroup0)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpointsGroup1)) |
| .times(1); |
| |
| List<Task> postSplitTasks = postSplitCaptured.getValues(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postSplitTasks).anyTimes(); |
| for (Task task : postSplitTasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0 |
| SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers = |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID1, "0" |
| ), |
| ImmutableSet.of() |
| ); |
| |
| SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers = |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ); |
| |
| SeekableStreamStartSequenceNumbers<String, String> group1ExpectedStartSequenceNumbers = |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, "0" |
| ), |
| ImmutableSet.of() |
| ); |
| |
| SeekableStreamEndSequenceNumbers<String, String> group1ExpectedEndSequenceNumbers = |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ); |
| |
| Assert.assertEquals(2, postSplitTasks.size()); |
| KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postSplitTasks.get(0)).getIOConfig(); |
| KinesisIndexTaskIOConfig group1Config = ((KinesisIndexTask) postSplitTasks.get(1)).getIOConfig(); |
| Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId()); |
| Assert.assertEquals((Integer) 1, group1Config.getTaskGroupId()); |
| Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers()); |
| Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers()); |
| Assert.assertEquals(group1ExpectedStartSequenceNumbers, group1Config.getStartSequenceNumbers()); |
| Assert.assertEquals(group1ExpectedEndSequenceNumbers, group1Config.getEndSequenceNumbers()); |
| |
| return postSplitTasks; |
| } |
| |
| /** |
| * Test task creation after a shard split with a closed shard, with the closed shards expiring and no longer |
| * being returned from record supplier. |
| * |
| * @param phaseTwoTasks List of tasks from the second phase where closed but not expired shards were present. |
| */ |
| private void testShardSplitPhaseThree(List<Task> phaseTwoTasks) throws Exception |
| { |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.reset(taskClient); |
| EasyMock.reset(taskMaster); |
| EasyMock.reset(taskRunner); |
| EasyMock.reset(supervisorRecordSupplier); |
| |
| // second set of tasks ran, shard 0 has expired, but shard 1 and 2 have data |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<String, String>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER, |
| SHARD_ID1, "100", |
| SHARD_ID2, "100" |
| ) |
| ) |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect( |
| indexerMetadataStorageCoordinator.resetDataSourceMetadata( |
| DATASOURCE, |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<String, String>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER, |
| SHARD_ID1, "100", |
| SHARD_ID2, "100" |
| ) |
| ) |
| ) |
| ) |
| ).andReturn(true).anyTimes(); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID2)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD2_PARTITION)) |
| .anyTimes(); |
| |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1))) |
| .andReturn("200").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) |
| .andReturn("200").anyTimes(); |
| |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| |
| Task successfulTask0 = phaseTwoTasks.get(0); |
| EasyMock.expect(taskStorage.getStatus(successfulTask0.getId())) |
| .andReturn(Optional.of(TaskStatus.success(successfulTask0.getId()))); |
| EasyMock.expect(taskStorage.getTask(successfulTask0.getId())).andReturn(Optional.of(successfulTask0)).anyTimes(); |
| |
| Task successfulTask1 = phaseTwoTasks.get(1); |
| EasyMock.expect(taskStorage.getStatus(successfulTask1.getId())) |
| .andReturn(Optional.of(TaskStatus.success(successfulTask1.getId()))); |
| EasyMock.expect(taskStorage.getTask(successfulTask1.getId())).andReturn(Optional.of(successfulTask1)).anyTimes(); |
| |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(postSplitCaptured))).andReturn(true).times(2); |
| |
| replayAll(); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>(); |
| checkpointsGroup0.put(0, ImmutableMap.of( |
| SHARD_ID2, "100" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>(); |
| checkpointsGroup1.put(1, ImmutableMap.of( |
| SHARD_ID1, "100" |
| )); |
| // there would be 2 tasks, 1 for each task group |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpointsGroup0)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpointsGroup1)) |
| .times(1); |
| |
| List<Task> postSplitTasks = postSplitCaptured.getValues(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postSplitTasks).anyTimes(); |
| for (Task task : postSplitTasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| |
| // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0 |
| SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers = |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID1, "100" |
| ), |
| ImmutableSet.of(SHARD_ID1) |
| ); |
| |
| SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers = |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ); |
| |
| SeekableStreamStartSequenceNumbers<String, String> group1ExpectedStartSequenceNumbers = |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, "100" |
| ), |
| ImmutableSet.of(SHARD_ID2) |
| ); |
| |
| SeekableStreamEndSequenceNumbers<String, String> group1ExpectedEndSequenceNumbers = |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ); |
| |
| Assert.assertEquals(2, postSplitTasks.size()); |
| KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postSplitTasks.get(0)).getIOConfig(); |
| KinesisIndexTaskIOConfig group1Config = ((KinesisIndexTask) postSplitTasks.get(1)).getIOConfig(); |
| Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId()); |
| Assert.assertEquals((Integer) 1, group1Config.getTaskGroupId()); |
| Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers()); |
| Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers()); |
| Assert.assertEquals(group1ExpectedStartSequenceNumbers, group1Config.getStartSequenceNumbers()); |
| Assert.assertEquals(group1ExpectedEndSequenceNumbers, group1Config.getEndSequenceNumbers()); |
| |
| Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of( |
| 0, ImmutableSet.of(SHARD_ID1), |
| 1, ImmutableSet.of(SHARD_ID2) |
| ); |
| Assert.assertEquals(expectedPartitionGroups, supervisor.getPartitionGroups()); |
| |
| ConcurrentHashMap<String, String> expectedPartitionOffsets = new ConcurrentHashMap<>( |
| ImmutableMap.of( |
| SHARD_ID2, "-1", |
| SHARD_ID1, "-1", |
| SHARD_ID0, "-1" |
| ) |
| ); |
| Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets()); |
| |
| } |
| |
| @Test |
| public void testShardMerge() throws Exception |
| { |
| supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); |
| taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); |
| |
| List<Task> phaseOneTasks = testShardMergePhaseOne(); |
| List<Task> phaseTwoTasks = testShardMergePhaseTwo(phaseOneTasks); |
| testShardMergePhaseThree(phaseTwoTasks); |
| } |
| |
| private List<Task> testShardMergePhaseOne() throws Exception |
| { |
| supervisorRecordSupplier.assign(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION)) |
| .anyTimes(); |
| |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> captured = Capture.newInstance(CaptureType.ALL); |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| null |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); |
| |
| replayAll(); |
| |
| supervisor.start(); |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| List<Task> tasks = captured.getValues(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| |
| TreeMap<Integer, Map<String, String>> checkpoints0 = new TreeMap<>(); |
| checkpoints0.put(0, ImmutableMap.of( |
| SHARD_ID1, |
| "0" |
| )); |
| TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); |
| checkpoints1.put(0, ImmutableMap.of( |
| SHARD_ID0, |
| "0" |
| )); |
| // there would be 2 tasks, 1 for each task group |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints0)) |
| .times(1); |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpoints1)) |
| .times(1); |
| |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes(); |
| for (Task task : tasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| return tasks; |
| } |
| |
| /** |
| * Test task creation after a shard split with a closed shard |
| * |
| * @param phaseOneTasks List of tasks from the initial phase where only one shard was present |
| */ |
| private List<Task> testShardMergePhaseTwo(List<Task> phaseOneTasks) throws Exception |
| { |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.reset(taskClient); |
| EasyMock.reset(taskMaster); |
| EasyMock.reset(taskRunner); |
| EasyMock.reset(supervisorRecordSupplier); |
| |
| // first tasks ran, both shard 0 and shard 1 have reached EOS, merged into shard 2 |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<String, String>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER, |
| SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER |
| ) |
| ) |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID2)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD2_PARTITION)) |
| .anyTimes(); |
| |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID0))) |
| .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1))) |
| .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) |
| .andReturn("100").anyTimes(); |
| |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> postMergeCaptured = Capture.newInstance(CaptureType.ALL); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| |
| Task successfulTask0 = phaseOneTasks.get(0); |
| Task successfulTask1 = phaseOneTasks.get(1); |
| EasyMock.expect(taskStorage.getStatus(successfulTask0.getId())) |
| .andReturn(Optional.of(TaskStatus.success(successfulTask0.getId()))); |
| EasyMock.expect(taskStorage.getTask(successfulTask0.getId())).andReturn(Optional.of(successfulTask0)).anyTimes(); |
| |
| EasyMock.expect(taskStorage.getStatus(successfulTask1.getId())) |
| .andReturn(Optional.of(TaskStatus.success(successfulTask1.getId()))); |
| EasyMock.expect(taskStorage.getTask(successfulTask1.getId())).andReturn(Optional.of(successfulTask1)).anyTimes(); |
| |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(postMergeCaptured))).andReturn(true).times(1); |
| |
| replayAll(); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>(); |
| checkpointsGroup0.put(0, ImmutableMap.of( |
| SHARD_ID2, "0", |
| SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER |
| )); |
| |
| // there would be 1 tasks, 1 for each task group, but task group 1 only has closed shards, so no task is created |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpointsGroup0)) |
| .times(1); |
| |
| List<Task> postMergeTasks = postMergeCaptured.getValues(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postMergeTasks).anyTimes(); |
| for (Task task : postMergeTasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0 |
| SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers = |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, "0" |
| ), |
| ImmutableSet.of() |
| ); |
| |
| SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers = |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ); |
| |
| Assert.assertEquals(1, postMergeTasks.size()); |
| KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postMergeTasks.get(0)).getIOConfig(); |
| Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId()); |
| Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers()); |
| Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers()); |
| |
| return postMergeTasks; |
| } |
| |
| /** |
| * Test task creation after a shard merge with two closed shards and one open shard, with the closed shards |
| * expiring and no longer being returned from record supplier. |
| * |
| * @param phaseTwoTasks List of tasks from the second phase where closed but not expired shards were present. |
| */ |
| private void testShardMergePhaseThree(List<Task> phaseTwoTasks) throws Exception |
| { |
| EasyMock.reset(indexerMetadataStorageCoordinator); |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskQueue); |
| EasyMock.reset(taskClient); |
| EasyMock.reset(taskMaster); |
| EasyMock.reset(taskRunner); |
| EasyMock.reset(supervisorRecordSupplier); |
| |
| // second set of tasks ran, shard 0 has expired, but shard 1 and 2 have data |
| EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<String, String>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER, |
| SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, |
| SHARD_ID2, "100" |
| ) |
| ) |
| ) |
| ).anyTimes(); |
| |
| EasyMock.expect( |
| indexerMetadataStorageCoordinator.resetDataSourceMetadata( |
| DATASOURCE, |
| new KinesisDataSourceMetadata( |
| new SeekableStreamEndSequenceNumbers<String, String>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER, |
| SHARD_ID1, KinesisSequenceNumber.EXPIRED_MARKER, |
| SHARD_ID2, "100" |
| ) |
| ) |
| ) |
| ) |
| ).andReturn(true).anyTimes(); |
| |
| EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) |
| .andReturn(ImmutableSet.of(SHARD_ID2)) |
| .anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getAssignment()) |
| .andReturn(ImmutableSet.of(SHARD2_PARTITION)) |
| .anyTimes(); |
| |
| supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); |
| EasyMock.expectLastCall().anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); |
| EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) |
| .andReturn("200").anyTimes(); |
| |
| supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); |
| EasyMock.expectLastCall().anyTimes(); |
| |
| Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL); |
| |
| EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); |
| EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); |
| EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| |
| Task successfulTask0 = phaseTwoTasks.get(0); |
| EasyMock.expect(taskStorage.getStatus(successfulTask0.getId())) |
| .andReturn(Optional.of(TaskStatus.success(successfulTask0.getId()))); |
| EasyMock.expect(taskStorage.getTask(successfulTask0.getId())).andReturn(Optional.of(successfulTask0)).anyTimes(); |
| |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| |
| EasyMock.expect(taskQueue.add(EasyMock.capture(postSplitCaptured))).andReturn(true).times(1); |
| |
| replayAll(); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| EasyMock.reset(taskStorage); |
| EasyMock.reset(taskClient); |
| |
| EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) |
| .anyTimes(); |
| EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) |
| .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) |
| .anyTimes(); |
| TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>(); |
| checkpointsGroup0.put(0, ImmutableMap.of( |
| SHARD_ID2, "100" |
| )); |
| |
| // there would be 1 task, only task group 0 has a shard |
| EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) |
| .andReturn(Futures.immediateFuture(checkpointsGroup0)) |
| .times(1); |
| |
| List<Task> postSplitTasks = postSplitCaptured.getValues(); |
| EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postSplitTasks).anyTimes(); |
| for (Task task : postSplitTasks) { |
| EasyMock.expect(taskStorage.getStatus(task.getId())) |
| .andReturn(Optional.of(TaskStatus.running(task.getId()))) |
| .anyTimes(); |
| EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); |
| } |
| EasyMock.replay(taskStorage); |
| EasyMock.replay(taskClient); |
| |
| supervisor.runInternal(); |
| verifyAll(); |
| |
| |
| // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0 |
| SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers = |
| new SeekableStreamStartSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, "100" |
| ), |
| ImmutableSet.of(SHARD_ID2) |
| ); |
| |
| SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers = |
| new SeekableStreamEndSequenceNumbers<>( |
| STREAM, |
| ImmutableMap.of( |
| SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER |
| ) |
| ); |
| |
| Assert.assertEquals(1, postSplitTasks.size()); |
| KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postSplitTasks.get(0)).getIOConfig(); |
| Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId()); |
| Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers()); |
| Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers()); |
| |
| Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of( |
| 0, ImmutableSet.of(SHARD_ID2), |
| 1, ImmutableSet.of() |
| ); |
| ConcurrentHashMap<String, String> expectedPartitionOffsets = new ConcurrentHashMap<>( |
| ImmutableMap.of( |
| SHARD_ID2, "-1", |
| SHARD_ID1, "-1", |
| SHARD_ID0, "-1" |
| ) |
| ); |
| Assert.assertEquals(expectedPartitionGroups, supervisor.getPartitionGroups()); |
| Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets()); |
| } |
| |
| private TestableKinesisSupervisor getTestableSupervisor( |
| int replicas, |
| int taskCount, |
| boolean useEarliestOffset, |
| String duration, |
| Period lateMessageRejectionPeriod, |
| Period earlyMessageRejectionPeriod, |
| boolean suspended |
| ) |
| { |
| return getTestableSupervisor( |
| replicas, |
| taskCount, |
| useEarliestOffset, |
| false, |
| duration, |
| lateMessageRejectionPeriod, |
| earlyMessageRejectionPeriod, |
| suspended |
| ); |
| } |
| |
| private TestableKinesisSupervisor getTestableSupervisor( |
| int replicas, |
| int taskCount, |
| boolean useEarliestOffset, |
| boolean resetOffsetAutomatically, |
| String duration, |
| Period lateMessageRejectionPeriod, |
| Period earlyMessageRejectionPeriod, |
| boolean suspended |
| ) |
| { |
| KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( |
| STREAM, |
| INPUT_FORMAT, |
| "awsEndpoint", |
| null, |
| replicas, |
| taskCount, |
| new Period(duration), |
| new Period("P1D"), |
| new Period("PT30S"), |
| useEarliestOffset, |
| new Period("PT30M"), |
| lateMessageRejectionPeriod, |
| earlyMessageRejectionPeriod, |
| null, |
| null, |
| null, |
| null, |
| null, |
| false |
| ); |
| |
| KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( |
| null, |
| null |
| ) |
| { |
| @Override |
| public KinesisIndexTaskClient build( |
| TaskInfoProvider taskInfoProvider, |
| String dataSource, |
| int numThreads, |
| Duration httpTimeout, |
| long numRetries |
| ) |
| { |
| Assert.assertEquals(TEST_CHAT_THREADS, numThreads); |
| Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); |
| Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); |
| return taskClient; |
| } |
| }; |
| |
| final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig( |
| 1000, |
| null, |
| 50000, |
| null, |
| new Period("P1Y"), |
| new File("/test"), |
| null, |
| null, |
| null, |
| true, |
| false, |
| null, |
| resetOffsetAutomatically, |
| null, |
| null, |
| numThreads, |
| TEST_CHAT_THREADS, |
| TEST_CHAT_RETRIES, |
| TEST_HTTP_TIMEOUT, |
| TEST_SHUTDOWN_TIMEOUT, |
| null, |
| null, |
| null, |
| 5000, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null |
| ); |
| |
| return new TestableKinesisSupervisor( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new KinesisSupervisorSpec( |
| null, |
| dataSchema, |
| tuningConfig, |
| kinesisSupervisorIOConfig, |
| null, |
| suspended, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new NoopServiceEmitter(), |
| new DruidMonitorSchedulerConfig(), |
| rowIngestionMetersFactory, |
| null, |
| new SupervisorStateManagerConfig() |
| ), |
| rowIngestionMetersFactory |
| ); |
| } |
| |
| private TestableKinesisSupervisor getTestableSupervisor( |
| int replicas, |
| int taskCount, |
| boolean useEarliestOffset, |
| String duration, |
| Period lateMessageRejectionPeriod, |
| Period earlyMessageRejectionPeriod |
| ) |
| { |
| return getTestableSupervisor( |
| replicas, |
| taskCount, |
| useEarliestOffset, |
| duration, |
| lateMessageRejectionPeriod, |
| earlyMessageRejectionPeriod, |
| false, |
| null, |
| null |
| ); |
| } |
| |
| private TestableKinesisSupervisor getTestableSupervisor( |
| int replicas, |
| int taskCount, |
| boolean useEarliestOffset, |
| String duration, |
| Period lateMessageRejectionPeriod, |
| Period earlyMessageRejectionPeriod, |
| boolean suspended, |
| Integer recordsPerFetch, |
| Integer fetchDelayMillis |
| ) |
| { |
| KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( |
| STREAM, |
| INPUT_FORMAT, |
| "awsEndpoint", |
| null, |
| replicas, |
| taskCount, |
| new Period(duration), |
| new Period("P1D"), |
| new Period("PT30S"), |
| useEarliestOffset, |
| new Period("PT30M"), |
| lateMessageRejectionPeriod, |
| earlyMessageRejectionPeriod, |
| null, |
| recordsPerFetch, |
| fetchDelayMillis, |
| null, |
| null, |
| false |
| ); |
| |
| KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( |
| null, |
| null |
| ) |
| { |
| @Override |
| public KinesisIndexTaskClient build( |
| TaskInfoProvider taskInfoProvider, |
| String dataSource, |
| int numThreads, |
| Duration httpTimeout, |
| long numRetries |
| ) |
| { |
| Assert.assertEquals(TEST_CHAT_THREADS, numThreads); |
| Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); |
| Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); |
| return taskClient; |
| } |
| }; |
| |
| return new TestableKinesisSupervisor( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new KinesisSupervisorSpec( |
| null, |
| dataSchema, |
| tuningConfig, |
| kinesisSupervisorIOConfig, |
| null, |
| suspended, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new NoopServiceEmitter(), |
| new DruidMonitorSchedulerConfig(), |
| rowIngestionMetersFactory, |
| null, |
| supervisorConfig |
| ), |
| rowIngestionMetersFactory |
| ); |
| } |
| |
| /** |
| * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent() |
| */ |
| private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( |
| int replicas, |
| int taskCount, |
| boolean useEarliestOffset, |
| String duration, |
| Period lateMessageRejectionPeriod, |
| Period earlyMessageRejectionPeriod, |
| boolean suspended, |
| Integer recordsPerFetch, |
| Integer fetchDelayMillis, |
| boolean isTaskCurrentReturn |
| ) |
| { |
| KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( |
| STREAM, |
| INPUT_FORMAT, |
| "awsEndpoint", |
| null, |
| replicas, |
| taskCount, |
| new Period(duration), |
| new Period("P1D"), |
| new Period("PT30S"), |
| useEarliestOffset, |
| new Period("PT30M"), |
| lateMessageRejectionPeriod, |
| earlyMessageRejectionPeriod, |
| null, |
| recordsPerFetch, |
| fetchDelayMillis, |
| null, |
| null, |
| false |
| ); |
| |
| KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( |
| null, |
| null |
| ) |
| { |
| @Override |
| public KinesisIndexTaskClient build( |
| TaskInfoProvider taskInfoProvider, |
| String dataSource, |
| int numThreads, |
| Duration httpTimeout, |
| long numRetries |
| ) |
| { |
| Assert.assertEquals(TEST_CHAT_THREADS, numThreads); |
| Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); |
| Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); |
| return taskClient; |
| } |
| }; |
| |
| return new TestableKinesisSupervisorWithCustomIsTaskCurrent( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new KinesisSupervisorSpec( |
| null, |
| dataSchema, |
| tuningConfig, |
| kinesisSupervisorIOConfig, |
| null, |
| suspended, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new NoopServiceEmitter(), |
| new DruidMonitorSchedulerConfig(), |
| rowIngestionMetersFactory, |
| null, |
| supervisorConfig |
| ), |
| rowIngestionMetersFactory, |
| isTaskCurrentReturn |
| ); |
| } |
| |
| /** |
| * Use for tests where you don't want generateSequenceName to be overridden out |
| */ |
| private KinesisSupervisor getSupervisor( |
| int replicas, |
| int taskCount, |
| boolean useEarliestOffset, |
| String duration, |
| Period lateMessageRejectionPeriod, |
| Period earlyMessageRejectionPeriod, |
| boolean suspended, |
| Integer recordsPerFetch, |
| Integer fetchDelayMillis, |
| DataSchema dataSchema, |
| KinesisSupervisorTuningConfig tuningConfig |
| ) |
| { |
| KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( |
| STREAM, |
| INPUT_FORMAT, |
| "awsEndpoint", |
| null, |
| replicas, |
| taskCount, |
| new Period(duration), |
| new Period("P1D"), |
| new Period("PT30S"), |
| useEarliestOffset, |
| new Period("PT30M"), |
| lateMessageRejectionPeriod, |
| earlyMessageRejectionPeriod, |
| null, |
| recordsPerFetch, |
| fetchDelayMillis, |
| null, |
| null, |
| false |
| ); |
| |
| KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory( |
| null, |
| null |
| ) |
| { |
| @Override |
| public KinesisIndexTaskClient build( |
| TaskInfoProvider taskInfoProvider, |
| String dataSource, |
| int numThreads, |
| Duration httpTimeout, |
| long numRetries |
| ) |
| { |
| Assert.assertEquals(TEST_CHAT_THREADS, numThreads); |
| Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); |
| Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); |
| return taskClient; |
| } |
| }; |
| |
| return new KinesisSupervisor( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new KinesisSupervisorSpec( |
| null, |
| dataSchema, |
| tuningConfig, |
| kinesisSupervisorIOConfig, |
| null, |
| suspended, |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| OBJECT_MAPPER, |
| new NoopServiceEmitter(), |
| new DruidMonitorSchedulerConfig(), |
| rowIngestionMetersFactory, |
| null, |
| supervisorConfig |
| ), |
| rowIngestionMetersFactory, |
| null |
| ); |
| } |
| |
| private static DataSchema getDataSchema(String dataSource) |
| { |
| List<DimensionSchema> dimensions = new ArrayList<>(); |
| dimensions.add(StringDimensionSchema.create("dim1")); |
| dimensions.add(StringDimensionSchema.create("dim2")); |
| |
| return new DataSchema( |
| dataSource, |
| new TimestampSpec("timestamp", "iso", null), |
| new DimensionsSpec( |
| dimensions, |
| null, |
| null |
| ), |
| new AggregatorFactory[]{new CountAggregatorFactory("rows")}, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.NONE, |
| ImmutableList.of() |
| ), |
| null |
| ); |
| } |
| |
| |
| private KinesisIndexTask createKinesisIndexTask( |
| String id, |
| String dataSource, |
| int taskGroupId, |
| SeekableStreamStartSequenceNumbers<String, String> startPartitions, |
| SeekableStreamEndSequenceNumbers<String, String> endPartitions, |
| DateTime minimumMessageTime, |
| DateTime maximumMessageTime |
| ) |
| { |
| return createKinesisIndexTask( |
| id, |
| taskGroupId, |
| startPartitions, |
| endPartitions, |
| minimumMessageTime, |
| maximumMessageTime, |
| getDataSchema(dataSource) |
| ); |
| } |
| |
| private KinesisIndexTask createKinesisIndexTask( |
| String id, |
| int taskGroupId, |
| SeekableStreamStartSequenceNumbers<String, String> startPartitions, |
| SeekableStreamEndSequenceNumbers<String, String> endPartitions, |
| DateTime minimumMessageTime, |
| DateTime maximumMessageTime, |
| DataSchema dataSchema |
| ) |
| { |
| return createKinesisIndexTask( |
| id, |
| taskGroupId, |
| startPartitions, |
| endPartitions, |
| minimumMessageTime, |
| maximumMessageTime, |
| dataSchema, |
| (KinesisIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig() |
| ); |
| } |
| |
| private KinesisIndexTask createKinesisIndexTask( |
| String id, |
| int taskGroupId, |
| SeekableStreamStartSequenceNumbers<String, String> startPartitions, |
| SeekableStreamEndSequenceNumbers<String, String> endPartitions, |
| DateTime minimumMessageTime, |
| DateTime maximumMessageTime, |
| DataSchema dataSchema, |
| KinesisIndexTaskTuningConfig tuningConfig |
| ) |
| { |
| return new KinesisIndexTask( |
| id, |
| null, |
| dataSchema, |
| tuningConfig, |
| new KinesisIndexTaskIOConfig( |
| 0, |
| "sequenceName-" + taskGroupId, |
| startPartitions, |
| endPartitions, |
| true, |
| minimumMessageTime, |
| maximumMessageTime, |
| INPUT_FORMAT, |
| "awsEndpoint", |
| null, |
| null, |
| null, |
| null, |
| false |
| ), |
| Collections.emptyMap(), |
| null |
| ); |
| } |
| |
| private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem |
| { |
| private final String taskType; |
| private final TaskLocation location; |
| private final String dataSource; |
| |
| public TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location) |
| { |
| super(task.getId(), result); |
| this.taskType = task.getType(); |
| this.location = location; |
| this.dataSource = task.getDataSource(); |
| } |
| |
| @Override |
| public TaskLocation getLocation() |
| { |
| return location; |
| } |
| |
| @Override |
| public String getTaskType() |
| { |
| return taskType; |
| } |
| |
| @Override |
| public String getDataSource() |
| { |
| return dataSource; |
| } |
| |
| } |
| |
| private class TestableKinesisSupervisor extends KinesisSupervisor |
| { |
| TestableKinesisSupervisor( |
| TaskStorage taskStorage, |
| TaskMaster taskMaster, |
| IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, |
| KinesisIndexTaskClientFactory taskClientFactory, |
| ObjectMapper mapper, |
| KinesisSupervisorSpec spec, |
| RowIngestionMetersFactory rowIngestionMetersFactory |
| ) |
| { |
| super( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| mapper, |
| spec, |
| rowIngestionMetersFactory, |
| null |
| ); |
| } |
| |
| @Override |
| protected String generateSequenceName( |
| Map<String, String> startPartitions, |
| Optional<DateTime> minimumMessageTime, |
| Optional<DateTime> maximumMessageTime, |
| DataSchema dataSchema, |
| SeekableStreamIndexTaskTuningConfig tuningConfig |
| ) |
| { |
| if (startPartitions.isEmpty()) { |
| return StringUtils.format("sequenceName-NOT-USED"); |
| } |
| final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()); |
| return StringUtils.format("sequenceName-%d", groupId); |
| } |
| |
| @Override |
| protected RecordSupplier<String, String> setupRecordSupplier() |
| { |
| return supervisorRecordSupplier; |
| } |
| |
| private SeekableStreamSupervisorStateManager getStateManager() |
| { |
| return stateManager; |
| } |
| } |
| |
| private class TestableKinesisSupervisorWithCustomIsTaskCurrent extends TestableKinesisSupervisor |
| { |
| private final boolean isTaskCurrentReturn; |
| |
| TestableKinesisSupervisorWithCustomIsTaskCurrent( |
| TaskStorage taskStorage, |
| TaskMaster taskMaster, |
| IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, |
| KinesisIndexTaskClientFactory taskClientFactory, |
| ObjectMapper mapper, |
| KinesisSupervisorSpec spec, |
| RowIngestionMetersFactory rowIngestionMetersFactory, |
| boolean isTaskCurrentReturn |
| ) |
| { |
| super( |
| taskStorage, |
| taskMaster, |
| indexerMetadataStorageCoordinator, |
| taskClientFactory, |
| mapper, |
| spec, |
| rowIngestionMetersFactory |
| ); |
| this.isTaskCurrentReturn = isTaskCurrentReturn; |
| } |
| |
| @Override |
| public boolean isTaskCurrent(int taskGroupId, String taskId) |
| { |
| return isTaskCurrentReturn; |
| } |
| } |
| } |