| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordination; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.druid.guice.ServerTypeConfig; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.loading.CacheTestSegmentCacheManager; |
| import org.apache.druid.segment.loading.CacheTestSegmentLoader; |
| import org.apache.druid.segment.loading.SegmentLoader; |
| import org.apache.druid.segment.loading.SegmentLoaderConfig; |
| import org.apache.druid.segment.loading.StorageLocationConfig; |
| import org.apache.druid.server.SegmentManager; |
| import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; |
| import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; |
| import org.apache.druid.server.metrics.NoopServiceEmitter; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.partition.NoneShardSpec; |
| import org.joda.time.Interval; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.TemporaryFolder; |
| import org.mockito.ArgumentMatchers; |
| import org.mockito.Mockito; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| /** |
| */ |
| public class SegmentLoadDropHandlerTest |
| { |
| public static final int COUNT = 50; |
| |
| private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); |
| |
| private SegmentLoadDropHandler segmentLoadDropHandler; |
| |
| private DataSegmentAnnouncer announcer; |
| private File infoDir; |
| private TestStorageLocation testStorageLocation; |
| private AtomicInteger announceCount; |
| private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe; |
| private CacheTestSegmentCacheManager segmentCacheManager; |
| private SegmentLoader segmentLoader; |
| private SegmentManager segmentManager; |
| private List<Runnable> scheduledRunnable; |
| private SegmentLoaderConfig segmentLoaderConfig; |
| private SegmentLoaderConfig segmentLoaderConfigNoLocations; |
| private ScheduledExecutorFactory scheduledExecutorFactory; |
| private List<StorageLocationConfig> locations; |
| |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| |
| @Rule |
| public TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| |
| public SegmentLoadDropHandlerTest() |
| { |
| EmittingLogger.registerEmitter(new NoopServiceEmitter()); |
| } |
| |
| @Before |
| public void setUp() |
| { |
| try { |
| testStorageLocation = new TestStorageLocation(temporaryFolder); |
| infoDir = testStorageLocation.getInfoDir(); |
| } |
| catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| locations = Collections.singletonList( |
| testStorageLocation.toStorageLocationConfig() |
| ); |
| |
| scheduledRunnable = new ArrayList<>(); |
| |
| segmentCacheManager = new CacheTestSegmentCacheManager(); |
| segmentLoader = new CacheTestSegmentLoader(); |
| segmentManager = new SegmentManager(segmentLoader); |
| segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); |
| announceCount = new AtomicInteger(0); |
| |
| announcer = new DataSegmentAnnouncer() |
| { |
| @Override |
| public void announceSegment(DataSegment segment) |
| { |
| segmentsAnnouncedByMe.add(segment); |
| announceCount.incrementAndGet(); |
| } |
| |
| @Override |
| public void unannounceSegment(DataSegment segment) |
| { |
| segmentsAnnouncedByMe.remove(segment); |
| announceCount.decrementAndGet(); |
| } |
| |
| @Override |
| public void announceSegments(Iterable<DataSegment> segments) |
| { |
| for (DataSegment segment : segments) { |
| segmentsAnnouncedByMe.add(segment); |
| } |
| announceCount.addAndGet(Iterables.size(segments)); |
| } |
| |
| @Override |
| public void unannounceSegments(Iterable<DataSegment> segments) |
| { |
| for (DataSegment segment : segments) { |
| segmentsAnnouncedByMe.remove(segment); |
| } |
| announceCount.addAndGet(-Iterables.size(segments)); |
| } |
| }; |
| |
| |
| segmentLoaderConfig = new SegmentLoaderConfig() |
| { |
| @Override |
| public File getInfoDir() |
| { |
| return testStorageLocation.getInfoDir(); |
| } |
| |
| @Override |
| public int getNumLoadingThreads() |
| { |
| return 5; |
| } |
| |
| @Override |
| public int getAnnounceIntervalMillis() |
| { |
| return 50; |
| } |
| |
| @Override |
| public List<StorageLocationConfig> getLocations() |
| { |
| return locations; |
| } |
| |
| @Override |
| public int getDropSegmentDelayMillis() |
| { |
| return 0; |
| } |
| }; |
| |
| segmentLoaderConfigNoLocations = new SegmentLoaderConfig() |
| { |
| @Override |
| public int getNumLoadingThreads() |
| { |
| return 5; |
| } |
| |
| @Override |
| public int getAnnounceIntervalMillis() |
| { |
| return 50; |
| } |
| |
| |
| @Override |
| public int getDropSegmentDelayMillis() |
| { |
| return 0; |
| } |
| }; |
| |
| scheduledExecutorFactory = new ScheduledExecutorFactory() |
| { |
| @Override |
| public ScheduledExecutorService create(int corePoolSize, String nameFormat) |
| { |
| /* |
| Override normal behavoir by adding the runnable to a list so that you can make sure |
| all the shceduled runnables are executed by explicitly calling run() on each item in the list |
| */ |
| return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat)) |
| { |
| @Override |
| public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) |
| { |
| scheduledRunnable.add(command); |
| return null; |
| } |
| }; |
| } |
| }; |
| |
| segmentLoadDropHandler = new SegmentLoadDropHandler( |
| jsonMapper, |
| segmentLoaderConfig, |
| announcer, |
| Mockito.mock(DataSegmentServerAnnouncer.class), |
| segmentManager, |
| segmentCacheManager, |
| scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), |
| new ServerTypeConfig(ServerType.HISTORICAL) |
| ); |
| } |
| |
| /** |
| * Steps: |
| * 1. removeSegment() schedules a delete runnable that deletes segment files, |
| * 2. addSegment() succesfully loads the segment and annouces it |
| * 3. scheduled delete task executes and realizes it should not delete the segment files. |
| */ |
| @Test |
| public void testSegmentLoading1() throws Exception |
| { |
| segmentLoadDropHandler.start(); |
| |
| final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); |
| |
| segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); |
| |
| Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); |
| |
| segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); |
| |
| /* |
| make sure the scheduled runnable that "deletes" segment files has been executed. |
| Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in |
| ZkCoordinator, the scheduled runnable will not actually delete segment files. |
| */ |
| for (Runnable runnable : scheduledRunnable) { |
| runnable.run(); |
| } |
| |
| Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); |
| Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment)); |
| |
| segmentLoadDropHandler.stop(); |
| } |
| |
| /** |
| * Steps: |
| * 1. addSegment() succesfully loads the segment and annouces it |
| * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files |
| * 3. addSegment() calls loadSegment() and annouces it again |
| * 4. scheduled delete task executes and realizes it should not delete the segment files. |
| */ |
| @Test |
| public void testSegmentLoading2() throws Exception |
| { |
| segmentLoadDropHandler.start(); |
| |
| final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); |
| |
| segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); |
| |
| Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); |
| |
| segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); |
| |
| Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); |
| |
| segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); |
| |
| /* |
| make sure the scheduled runnable that "deletes" segment files has been executed. |
| Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in |
| ZkCoordinator, the scheduled runnable will not actually delete segment files. |
| */ |
| for (Runnable runnable : scheduledRunnable) { |
| runnable.run(); |
| } |
| |
| Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); |
| Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment)); |
| |
| segmentLoadDropHandler.stop(); |
| } |
| |
| @Test |
| public void testLoadCache() throws Exception |
| { |
| Set<DataSegment> segments = new HashSet<>(); |
| for (int i = 0; i < COUNT; ++i) { |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02"))); |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03"))); |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04"))); |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06"))); |
| segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); |
| segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); |
| } |
| |
| for (DataSegment segment : segments) { |
| testStorageLocation.writeSegmentInfoToCache(segment); |
| } |
| |
| testStorageLocation.checkInfoCache(segments); |
| Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); |
| segmentLoadDropHandler.start(); |
| Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); |
| for (int i = 0; i < COUNT; ++i) { |
| Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); |
| Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); |
| } |
| Assert.assertEquals(13 * COUNT, announceCount.get()); |
| segmentLoadDropHandler.stop(); |
| |
| for (DataSegment segment : segments) { |
| testStorageLocation.deleteSegmentInfoFromCache(segment); |
| } |
| |
| Assert.assertEquals(0, infoDir.listFiles().length); |
| Assert.assertTrue(infoDir.delete()); |
| } |
| |
| private DataSegment makeSegment(String dataSource, String version, Interval interval) |
| { |
| return new DataSegment( |
| dataSource, |
| interval, |
| version, |
| ImmutableMap.of("version", version, "interval", interval, "cacheDir", infoDir), |
| Arrays.asList("dim1", "dim2", "dim3"), |
| Arrays.asList("metric1", "metric2"), |
| NoneShardSpec.instance(), |
| IndexIO.CURRENT_VERSION_ID, |
| 123L |
| ); |
| } |
| |
| @Test |
| public void testStartStop() throws Exception |
| { |
| SegmentLoadDropHandler handler = new SegmentLoadDropHandler( |
| jsonMapper, |
| new SegmentLoaderConfig() |
| { |
| @Override |
| public File getInfoDir() |
| { |
| return infoDir; |
| } |
| |
| @Override |
| public int getNumLoadingThreads() |
| { |
| return 5; |
| } |
| |
| @Override |
| public List<StorageLocationConfig> getLocations() |
| { |
| return locations; |
| } |
| |
| @Override |
| public int getAnnounceIntervalMillis() |
| { |
| return 50; |
| } |
| }, |
| announcer, |
| Mockito.mock(DataSegmentServerAnnouncer.class), |
| segmentManager, |
| segmentCacheManager, |
| new ServerTypeConfig(ServerType.HISTORICAL) |
| ); |
| |
| Set<DataSegment> segments = new HashSet<>(); |
| for (int i = 0; i < COUNT; ++i) { |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); |
| segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); |
| segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02"))); |
| segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); |
| segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); |
| } |
| |
| for (DataSegment segment : segments) { |
| testStorageLocation.writeSegmentInfoToCache(segment); |
| } |
| |
| testStorageLocation.checkInfoCache(segments); |
| Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); |
| |
| handler.start(); |
| Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); |
| for (int i = 0; i < COUNT; ++i) { |
| Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); |
| Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); |
| } |
| Assert.assertEquals(5 * COUNT, announceCount.get()); |
| handler.stop(); |
| |
| for (DataSegment segment : segments) { |
| testStorageLocation.deleteSegmentInfoFromCache(segment); |
| } |
| |
| Assert.assertEquals(0, infoDir.listFiles().length); |
| Assert.assertTrue(infoDir.delete()); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testProcessBatch() throws Exception |
| { |
| segmentLoadDropHandler.start(); |
| |
| DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); |
| DataSegment segment2 = makeSegment("batchtest2", "1", Intervals.of("P1d/2011-04-01")); |
| |
| List<DataSegmentChangeRequest> batch = ImmutableList.of( |
| new SegmentChangeRequestLoad(segment1), |
| new SegmentChangeRequestDrop(segment2) |
| ); |
| |
| ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler |
| .processBatch(batch); |
| |
| List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result = future.get(); |
| Assert.assertEquals(SegmentLoadDropHandler.Status.PENDING, result.get(0).getStatus()); |
| Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus()); |
| |
| for (Runnable runnable : scheduledRunnable) { |
| runnable.run(); |
| } |
| |
| result = segmentLoadDropHandler.processBatch(batch).get(); |
| Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus()); |
| Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus()); |
| |
| |
| for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : segmentLoadDropHandler.processBatch(batch) |
| .get()) { |
| Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, e.getStatus()); |
| } |
| |
| segmentLoadDropHandler.stop(); |
| } |
| |
| @Test(timeout = 60_000L) |
| public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception |
| { |
| final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); |
| Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any())) |
| .thenThrow(new RuntimeException("segment loading failure test")) |
| .thenReturn(true); |
| final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( |
| jsonMapper, |
| segmentLoaderConfig, |
| announcer, |
| Mockito.mock(DataSegmentServerAnnouncer.class), |
| segmentManager, |
| segmentCacheManager, |
| scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), |
| new ServerTypeConfig(ServerType.HISTORICAL) |
| ); |
| |
| segmentLoadDropHandler.start(); |
| |
| DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); |
| |
| List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); |
| |
| ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler |
| .processBatch(batch); |
| |
| for (Runnable runnable : scheduledRunnable) { |
| runnable.run(); |
| } |
| List<DataSegmentChangeRequestAndStatus> result = future.get(); |
| Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState()); |
| |
| future = segmentLoadDropHandler.processBatch(batch); |
| for (Runnable runnable : scheduledRunnable) { |
| runnable.run(); |
| } |
| result = future.get(); |
| Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); |
| |
| segmentLoadDropHandler.stop(); |
| } |
| } |