| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.worker; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.jsontype.NamedType; |
| import com.google.common.base.Joiner; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.curator.retry.ExponentialBackoffRetry; |
| import org.apache.curator.test.TestingCluster; |
| import org.apache.druid.client.indexing.NoopIndexingServiceClient; |
| import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; |
| import org.apache.druid.discovery.DruidLeaderClient; |
| import org.apache.druid.indexer.TaskState; |
| import org.apache.druid.indexing.common.IndexingServiceCondition; |
| import org.apache.druid.indexing.common.SegmentCacheManagerFactory; |
| import org.apache.druid.indexing.common.TaskToolboxFactory; |
| import org.apache.druid.indexing.common.TestRealtimeTask; |
| import org.apache.druid.indexing.common.TestTasks; |
| import org.apache.druid.indexing.common.TestUtils; |
| import org.apache.druid.indexing.common.actions.TaskActionClient; |
| import org.apache.druid.indexing.common.actions.TaskActionClientFactory; |
| import org.apache.druid.indexing.common.config.TaskConfig; |
| import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.common.task.TestAppenderatorsManager; |
| import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner; |
| import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig; |
| import org.apache.druid.indexing.worker.config.WorkerConfig; |
| import org.apache.druid.java.util.common.FileUtils; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.IndexMergerV9; |
| import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; |
| import org.apache.druid.segment.join.NoopJoinableFactory; |
| import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; |
| import org.apache.druid.server.DruidNode; |
| import org.apache.druid.server.initialization.IndexerZkConfig; |
| import org.apache.druid.server.initialization.ServerConfig; |
| import org.apache.druid.server.initialization.ZkPathsConfig; |
| import org.apache.druid.server.metrics.NoopServiceEmitter; |
| import org.apache.druid.server.security.AuthTestUtils; |
| import org.easymock.EasyMock; |
| import org.joda.time.Period; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.util.List; |
| |
| /** |
| * |
| */ |
| public class WorkerTaskMonitorTest |
| { |
| private static final Joiner JOINER = Joiner.on("/"); |
| private static final String BASE_PATH = "/test/druid"; |
| private static final String TASKS_PATH = StringUtils.format("%s/indexer/tasks/worker", BASE_PATH); |
| private static final String STATUS_PATH = StringUtils.format("%s/indexer/status/worker", BASE_PATH); |
| private static final DruidNode DUMMY_NODE = new DruidNode("dummy", "dummy", false, 9000, null, true, false); |
| |
| private TestingCluster testingCluster; |
| private CuratorFramework cf; |
| private WorkerCuratorCoordinator workerCuratorCoordinator; |
| private WorkerTaskMonitor workerTaskMonitor; |
| |
| private Task task; |
| |
| private Worker worker; |
| private final TestUtils testUtils; |
| private ObjectMapper jsonMapper; |
| private IndexMergerV9 indexMergerV9; |
| private IndexIO indexIO; |
| |
| public WorkerTaskMonitorTest() |
| { |
| testUtils = new TestUtils(); |
| jsonMapper = testUtils.getTestObjectMapper(); |
| indexMergerV9 = testUtils.getTestIndexMergerV9(); |
| indexIO = testUtils.getTestIndexIO(); |
| } |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| testingCluster = new TestingCluster(1); |
| testingCluster.start(); |
| |
| cf = CuratorFrameworkFactory.builder() |
| .connectString(testingCluster.getConnectString()) |
| .retryPolicy(new ExponentialBackoffRetry(1, 10)) |
| .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) |
| .build(); |
| cf.start(); |
| cf.blockUntilConnected(); |
| cf.create().creatingParentsIfNeeded().forPath(BASE_PATH); |
| |
| worker = new Worker( |
| "http", |
| "worker", |
| "localhost", |
| 3, |
| "0", |
| WorkerConfig.DEFAULT_CATEGORY |
| ); |
| |
| workerCuratorCoordinator = new WorkerCuratorCoordinator( |
| jsonMapper, |
| new IndexerZkConfig( |
| new ZkPathsConfig() |
| { |
| @Override |
| public String getBase() |
| { |
| return BASE_PATH; |
| } |
| }, null, null, null, null |
| ), |
| new TestRemoteTaskRunnerConfig(new Period("PT1S")), |
| cf, |
| worker |
| ); |
| workerCuratorCoordinator.start(); |
| |
| |
| // Start a task monitor |
| workerTaskMonitor = createTaskMonitor(); |
| TestTasks.registerSubtypes(jsonMapper); |
| jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime")); |
| workerTaskMonitor.start(); |
| |
| task = TestTasks.immediateSuccess("test"); |
| } |
| |
| private WorkerTaskMonitor createTaskMonitor() |
| { |
| final TaskConfig taskConfig = new TaskConfig( |
| FileUtils.createTempDir().toString(), |
| null, |
| null, |
| 0, |
| null, |
| false, |
| null, |
| null, |
| null, |
| false, |
| false |
| ); |
| TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); |
| TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); |
| EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes(); |
| SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); |
| EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory); |
| return new WorkerTaskMonitor( |
| jsonMapper, |
| new SingleTaskBackgroundRunner( |
| new TaskToolboxFactory( |
| taskConfig, |
| null, |
| taskActionClientFactory, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| notifierFactory, |
| null, |
| null, |
| NoopJoinableFactory.INSTANCE, |
| null, |
| new SegmentCacheManagerFactory(jsonMapper), |
| jsonMapper, |
| indexIO, |
| null, |
| null, |
| null, |
| indexMergerV9, |
| null, |
| null, |
| null, |
| null, |
| new NoopTestTaskReportFileWriter(), |
| null, |
| AuthTestUtils.TEST_AUTHORIZER_MAPPER, |
| new NoopChatHandlerProvider(), |
| testUtils.getRowIngestionMetersFactory(), |
| new TestAppenderatorsManager(), |
| new NoopIndexingServiceClient(), |
| null, |
| null, |
| null |
| ), |
| taskConfig, |
| new NoopServiceEmitter(), |
| DUMMY_NODE, |
| new ServerConfig() |
| ), |
| taskConfig, |
| cf, |
| workerCuratorCoordinator, |
| EasyMock.createNiceMock(DruidLeaderClient.class) |
| ); |
| } |
| |
| @After |
| public void tearDown() throws Exception |
| { |
| workerCuratorCoordinator.stop(); |
| workerTaskMonitor.stop(); |
| cf.close(); |
| testingCluster.stop(); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testRunTask() throws Exception |
| { |
| Assert.assertTrue( |
| TestUtils.conditionValid( |
| new IndexingServiceCondition() |
| { |
| @Override |
| public boolean isValid() |
| { |
| try { |
| return cf.checkExists().forPath(JOINER.join(TASKS_PATH, task.getId())) == null; |
| } |
| catch (Exception e) { |
| return false; |
| } |
| } |
| } |
| ) |
| ); |
| |
| cf.create() |
| .creatingParentsIfNeeded() |
| .forPath(JOINER.join(TASKS_PATH, task.getId()), jsonMapper.writeValueAsBytes(task)); |
| |
| Assert.assertTrue( |
| TestUtils.conditionValid( |
| new IndexingServiceCondition() |
| { |
| @Override |
| public boolean isValid() |
| { |
| try { |
| final byte[] bytes = cf.getData().forPath(JOINER.join(STATUS_PATH, task.getId())); |
| final TaskAnnouncement announcement = jsonMapper.readValue( |
| bytes, |
| TaskAnnouncement.class |
| ); |
| return announcement.getTaskStatus().isComplete(); |
| } |
| catch (Exception e) { |
| return false; |
| } |
| } |
| } |
| ) |
| ); |
| |
| TaskAnnouncement taskAnnouncement = jsonMapper.readValue( |
| cf.getData().forPath(JOINER.join(STATUS_PATH, task.getId())), TaskAnnouncement.class |
| ); |
| |
| Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); |
| Assert.assertEquals(TaskState.SUCCESS, taskAnnouncement.getTaskStatus().getStatusCode()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testGetAnnouncements() throws Exception |
| { |
| cf.create() |
| .creatingParentsIfNeeded() |
| .forPath(JOINER.join(TASKS_PATH, task.getId()), jsonMapper.writeValueAsBytes(task)); |
| |
| Assert.assertTrue( |
| TestUtils.conditionValid( |
| new IndexingServiceCondition() |
| { |
| @Override |
| public boolean isValid() |
| { |
| try { |
| final byte[] bytes = cf.getData().forPath(JOINER.join(STATUS_PATH, task.getId())); |
| final TaskAnnouncement announcement = jsonMapper.readValue( |
| bytes, |
| TaskAnnouncement.class |
| ); |
| return announcement.getTaskStatus().isComplete(); |
| } |
| catch (Exception e) { |
| return false; |
| } |
| } |
| } |
| ) |
| ); |
| |
| List<TaskAnnouncement> announcements = workerCuratorCoordinator.getAnnouncements(); |
| Assert.assertEquals(1, announcements.size()); |
| Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId()); |
| Assert.assertEquals(TaskState.SUCCESS, announcements.get(0).getTaskStatus().getStatusCode()); |
| Assert.assertEquals(DUMMY_NODE.getHost(), announcements.get(0).getTaskLocation().getHost()); |
| Assert.assertEquals(DUMMY_NODE.getPlaintextPort(), announcements.get(0).getTaskLocation().getPort()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testRestartCleansOldStatus() throws Exception |
| { |
| task = TestTasks.unending("test"); |
| |
| cf.create() |
| .creatingParentsIfNeeded() |
| .forPath(JOINER.join(TASKS_PATH, task.getId()), jsonMapper.writeValueAsBytes(task)); |
| |
| Assert.assertTrue( |
| TestUtils.conditionValid( |
| new IndexingServiceCondition() |
| { |
| @Override |
| public boolean isValid() |
| { |
| try { |
| return cf.checkExists().forPath(JOINER.join(STATUS_PATH, task.getId())) != null; |
| } |
| catch (Exception e) { |
| return false; |
| } |
| } |
| } |
| ) |
| ); |
| // simulate node restart |
| workerTaskMonitor.stop(); |
| workerTaskMonitor = createTaskMonitor(); |
| workerTaskMonitor.start(); |
| List<TaskAnnouncement> announcements = workerCuratorCoordinator.getAnnouncements(); |
| Assert.assertEquals(1, announcements.size()); |
| Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId()); |
| Assert.assertEquals(TaskState.FAILED, announcements.get(0).getTaskStatus().getStatusCode()); |
| Assert.assertEquals( |
| "Canceled as unknown task. See middleManager or indexer logs for more details.", |
| announcements.get(0).getTaskStatus().getErrorMsg() |
| ); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testStatusAnnouncementsArePersistent() throws Exception |
| { |
| cf.create() |
| .creatingParentsIfNeeded() |
| .forPath(JOINER.join(TASKS_PATH, task.getId()), jsonMapper.writeValueAsBytes(task)); |
| |
| Assert.assertTrue( |
| TestUtils.conditionValid( |
| new IndexingServiceCondition() |
| { |
| @Override |
| public boolean isValid() |
| { |
| try { |
| return cf.checkExists().forPath(JOINER.join(STATUS_PATH, task.getId())) != null; |
| } |
| catch (Exception e) { |
| return false; |
| } |
| } |
| } |
| ) |
| ); |
| // ephemeral owner is 0 is created node is PERSISTENT |
| Assert.assertEquals(0, cf.checkExists().forPath(JOINER.join(STATUS_PATH, task.getId())).getEphemeralOwner()); |
| |
| } |
| } |