| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordination; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.google.common.base.Function; |
| import com.google.common.base.Functions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.druid.client.cache.CacheConfig; |
| import org.apache.druid.client.cache.CachePopulatorStats; |
| import org.apache.druid.client.cache.ForegroundCachePopulator; |
| import org.apache.druid.client.cache.LocalCacheProvider; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.MapUtils; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.java.util.common.guava.Yielder; |
| import org.apache.druid.java.util.common.guava.YieldingAccumulator; |
| import org.apache.druid.java.util.common.guava.YieldingSequenceBase; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.query.BaseQuery; |
| import org.apache.druid.query.ConcatQueryRunner; |
| import org.apache.druid.query.DataSource; |
| import org.apache.druid.query.DefaultQueryMetrics; |
| import org.apache.druid.query.Druids; |
| import org.apache.druid.query.ForwardingQueryProcessingPool; |
| import org.apache.druid.query.NoopQueryRunner; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.QueryMetrics; |
| import org.apache.druid.query.QueryPlus; |
| import org.apache.druid.query.QueryProcessingPool; |
| import org.apache.druid.query.QueryRunner; |
| import org.apache.druid.query.QueryRunnerFactory; |
| import org.apache.druid.query.QueryRunnerFactoryConglomerate; |
| import org.apache.druid.query.QueryToolChest; |
| import org.apache.druid.query.QueryUnsupportedException; |
| import org.apache.druid.query.Result; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.query.TableDataSource; |
| import org.apache.druid.query.aggregation.MetricManipulationFn; |
| import org.apache.druid.query.context.DefaultResponseContext; |
| import org.apache.druid.query.context.ResponseContext; |
| import org.apache.druid.query.context.ResponseContext.Key; |
| import org.apache.druid.query.filter.DimFilter; |
| import org.apache.druid.query.filter.Filter; |
| import org.apache.druid.query.planning.DataSourceAnalysis; |
| import org.apache.druid.query.search.SearchQuery; |
| import org.apache.druid.query.search.SearchResultValue; |
| import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; |
| import org.apache.druid.query.spec.QuerySegmentSpec; |
| import org.apache.druid.segment.Cursor; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.Metadata; |
| import org.apache.druid.segment.QueryableIndex; |
| import org.apache.druid.segment.ReferenceCountingSegment; |
| import org.apache.druid.segment.Segment; |
| import org.apache.druid.segment.SegmentLazyLoadFailCallback; |
| import org.apache.druid.segment.StorageAdapter; |
| import org.apache.druid.segment.VirtualColumns; |
| import org.apache.druid.segment.column.ColumnCapabilities; |
| import org.apache.druid.segment.data.Indexed; |
| import org.apache.druid.segment.join.NoopJoinableFactory; |
| import org.apache.druid.segment.loading.SegmentLoader; |
| import org.apache.druid.segment.loading.SegmentLoadingException; |
| import org.apache.druid.server.SegmentManager; |
| import org.apache.druid.server.initialization.ServerConfig; |
| import org.apache.druid.server.metrics.NoopServiceEmitter; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.SegmentId; |
| import org.apache.druid.timeline.TimelineObjectHolder; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.apache.druid.timeline.partition.NoneShardSpec; |
| import org.apache.druid.timeline.partition.PartitionChunk; |
| import org.joda.time.DateTime; |
| import org.joda.time.Interval; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| */ |
| public class ServerManagerTest |
| { |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| |
| private ServerManager serverManager; |
| private MyQueryRunnerFactory factory; |
| private CountDownLatch queryWaitLatch; |
| private CountDownLatch queryWaitYieldLatch; |
| private CountDownLatch queryNotifyLatch; |
| private ExecutorService serverManagerExec; |
| private SegmentManager segmentManager; |
| |
| @Before |
| public void setUp() |
| { |
| EmittingLogger.registerEmitter(new NoopServiceEmitter()); |
| |
| queryWaitLatch = new CountDownLatch(1); |
| queryWaitYieldLatch = new CountDownLatch(1); |
| queryNotifyLatch = new CountDownLatch(1); |
| factory = new MyQueryRunnerFactory(queryWaitLatch, queryWaitYieldLatch, queryNotifyLatch); |
| serverManagerExec = Executors.newFixedThreadPool(2); |
| segmentManager = new SegmentManager( |
| new SegmentLoader() |
| { |
| @Override |
| public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) |
| { |
| return new SegmentForTesting( |
| MapUtils.getString(segment.getLoadSpec(), "version"), |
| (Interval) segment.getLoadSpec().get("interval") |
| ); |
| } |
| |
| @Override |
| public void cleanup(DataSegment segment) |
| { |
| |
| } |
| } |
| ); |
| serverManager = new ServerManager( |
| new QueryRunnerFactoryConglomerate() |
| { |
| @Override |
| public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query) |
| { |
| if (query instanceof SearchQuery) { |
| return (QueryRunnerFactory) factory; |
| } else { |
| return null; |
| } |
| } |
| }, |
| new NoopServiceEmitter(), |
| new ForwardingQueryProcessingPool(serverManagerExec), |
| new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1), |
| new DefaultObjectMapper(), |
| new LocalCacheProvider().get(), |
| new CacheConfig(), |
| segmentManager, |
| NoopJoinableFactory.INSTANCE, |
| new ServerConfig() |
| ); |
| |
| loadQueryable("test", "1", Intervals.of("P1d/2011-04-01")); |
| loadQueryable("test", "1", Intervals.of("P1d/2011-04-02")); |
| loadQueryable("test", "2", Intervals.of("P1d/2011-04-02")); |
| loadQueryable("test", "1", Intervals.of("P1d/2011-04-03")); |
| loadQueryable("test", "1", Intervals.of("P1d/2011-04-04")); |
| loadQueryable("test", "1", Intervals.of("P1d/2011-04-05")); |
| loadQueryable("test", "2", Intervals.of("PT1h/2011-04-04T01")); |
| loadQueryable("test", "2", Intervals.of("PT1h/2011-04-04T02")); |
| loadQueryable("test", "2", Intervals.of("PT1h/2011-04-04T03")); |
| loadQueryable("test", "2", Intervals.of("PT1h/2011-04-04T05")); |
| loadQueryable("test", "2", Intervals.of("PT1h/2011-04-04T06")); |
| loadQueryable("test2", "1", Intervals.of("P1d/2011-04-01")); |
| loadQueryable("test2", "1", Intervals.of("P1d/2011-04-02")); |
| } |
| |
| @Test |
| public void testSimpleGet() |
| { |
| Future future = assertQueryable( |
| Granularities.DAY, |
| "test", |
| Intervals.of("P1d/2011-04-01"), |
| ImmutableList.of( |
| new Pair<String, Interval>("1", Intervals.of("P1d/2011-04-01")) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| |
| |
| future = assertQueryable( |
| Granularities.DAY, |
| "test", Intervals.of("P2d/2011-04-02"), |
| ImmutableList.of( |
| new Pair<String, Interval>("1", Intervals.of("P1d/2011-04-01")), |
| new Pair<String, Interval>("2", Intervals.of("P1d/2011-04-02")) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| } |
| |
| @Test |
| public void testDelete1() |
| { |
| final String dataSouce = "test"; |
| final Interval interval = Intervals.of("2011-04-01/2011-04-02"); |
| |
| Future future = assertQueryable( |
| Granularities.DAY, |
| dataSouce, interval, |
| ImmutableList.of( |
| new Pair<String, Interval>("2", interval) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| |
| dropQueryable(dataSouce, "2", interval); |
| future = assertQueryable( |
| Granularities.DAY, |
| dataSouce, interval, |
| ImmutableList.of( |
| new Pair<String, Interval>("1", interval) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| } |
| |
| @Test |
| public void testDelete2() |
| { |
| loadQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| Future future = assertQueryable( |
| Granularities.DAY, |
| "test", Intervals.of("2011-04-04/2011-04-06"), |
| ImmutableList.of( |
| new Pair<String, Interval>("3", Intervals.of("2011-04-04/2011-04-05")) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| |
| dropQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| dropQueryable("test", "1", Intervals.of("2011-04-04/2011-04-05")); |
| |
| future = assertQueryable( |
| Granularities.HOUR, |
| "test", Intervals.of("2011-04-04/2011-04-04T06"), |
| ImmutableList.of( |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T00/2011-04-04T01")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T01/2011-04-04T02")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T02/2011-04-04T03")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T04/2011-04-04T05")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T05/2011-04-04T06")) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| |
| future = assertQueryable( |
| Granularities.HOUR, |
| "test", Intervals.of("2011-04-04/2011-04-04T03"), |
| ImmutableList.of( |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T00/2011-04-04T01")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T01/2011-04-04T02")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T02/2011-04-04T03")) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| |
| future = assertQueryable( |
| Granularities.HOUR, |
| "test", Intervals.of("2011-04-04T04/2011-04-04T06"), |
| ImmutableList.of( |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T04/2011-04-04T05")), |
| new Pair<String, Interval>("2", Intervals.of("2011-04-04T05/2011-04-04T06")) |
| ) |
| ); |
| waitForTestVerificationAndCleanup(future); |
| } |
| |
| @Test |
| public void testReferenceCounting() throws Exception |
| { |
| loadQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| Future future = assertQueryable( |
| Granularities.DAY, |
| "test", Intervals.of("2011-04-04/2011-04-06"), |
| ImmutableList.of( |
| new Pair<String, Interval>("3", Intervals.of("2011-04-04/2011-04-05")) |
| ) |
| ); |
| |
| queryNotifyLatch.await(1000, TimeUnit.MILLISECONDS); |
| |
| Assert.assertEquals(1, factory.getSegmentReferences().size()); |
| |
| for (ReferenceCountingSegment referenceCountingSegment : factory.getSegmentReferences()) { |
| Assert.assertEquals(1, referenceCountingSegment.getNumReferences()); |
| } |
| |
| queryWaitYieldLatch.countDown(); |
| |
| Assert.assertTrue(factory.getAdapters().size() == 1); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertFalse(segmentForTesting.isClosed()); |
| } |
| |
| queryWaitLatch.countDown(); |
| future.get(); |
| |
| dropQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertTrue(segmentForTesting.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testReferenceCountingWhileQueryExecuting() throws Exception |
| { |
| loadQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| Future future = assertQueryable( |
| Granularities.DAY, |
| "test", Intervals.of("2011-04-04/2011-04-06"), |
| ImmutableList.of( |
| new Pair<String, Interval>("3", Intervals.of("2011-04-04/2011-04-05")) |
| ) |
| ); |
| |
| queryNotifyLatch.await(1000, TimeUnit.MILLISECONDS); |
| |
| Assert.assertEquals(1, factory.getSegmentReferences().size()); |
| |
| for (ReferenceCountingSegment referenceCountingSegment : factory.getSegmentReferences()) { |
| Assert.assertEquals(1, referenceCountingSegment.getNumReferences()); |
| } |
| |
| queryWaitYieldLatch.countDown(); |
| |
| Assert.assertEquals(1, factory.getAdapters().size()); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertFalse(segmentForTesting.isClosed()); |
| } |
| |
| dropQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertFalse(segmentForTesting.isClosed()); |
| } |
| |
| queryWaitLatch.countDown(); |
| future.get(); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertTrue(segmentForTesting.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testMultipleDrops() throws Exception |
| { |
| loadQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| Future future = assertQueryable( |
| Granularities.DAY, |
| "test", Intervals.of("2011-04-04/2011-04-06"), |
| ImmutableList.of( |
| new Pair<String, Interval>("3", Intervals.of("2011-04-04/2011-04-05")) |
| ) |
| ); |
| |
| queryNotifyLatch.await(1000, TimeUnit.MILLISECONDS); |
| |
| Assert.assertEquals(1, factory.getSegmentReferences().size()); |
| |
| for (ReferenceCountingSegment referenceCountingSegment : factory.getSegmentReferences()) { |
| Assert.assertEquals(1, referenceCountingSegment.getNumReferences()); |
| } |
| |
| queryWaitYieldLatch.countDown(); |
| |
| Assert.assertEquals(1, factory.getAdapters().size()); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertFalse(segmentForTesting.isClosed()); |
| } |
| |
| dropQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| dropQueryable("test", "3", Intervals.of("2011-04-04/2011-04-05")); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertFalse(segmentForTesting.isClosed()); |
| } |
| |
| queryWaitLatch.countDown(); |
| future.get(); |
| |
| for (SegmentForTesting segmentForTesting : factory.getAdapters()) { |
| Assert.assertTrue(segmentForTesting.isClosed()); |
| } |
| } |
| |
| @Test |
| public void testGetQueryRunnerForIntervalsWhenTimelineIsMissingReturningNoopQueryRunner() |
| { |
| final Interval interval = Intervals.of("0000-01-01/P1D"); |
| final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForIntervals( |
| searchQuery("unknown_datasource", interval, Granularities.ALL), |
| Collections.singletonList(interval) |
| ); |
| Assert.assertSame(NoopQueryRunner.class, queryRunner.getClass()); |
| } |
| |
| @Test |
| public void testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegments() |
| { |
| final Interval interval = Intervals.of("0000-01-01/P1D"); |
| final SearchQuery query = searchQuery("unknown_datasource", interval, Granularities.ALL); |
| final List<SegmentDescriptor> unknownSegments = Collections.singletonList( |
| new SegmentDescriptor(interval, "unknown_version", 0) |
| ); |
| final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments( |
| query, |
| unknownSegments |
| ); |
| final ResponseContext responseContext = DefaultResponseContext.createEmpty(); |
| final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); |
| Assert.assertTrue(results.isEmpty()); |
| Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); |
| Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); |
| } |
| |
| @Test |
| public void testGetQueryRunnerForSegmentsWhenTimelineEntryIsMissingReportingMissingSegments() |
| { |
| final Interval interval = Intervals.of("P1d/2011-04-01"); |
| final SearchQuery query = searchQuery("test", interval, Granularities.ALL); |
| final List<SegmentDescriptor> unknownSegments = Collections.singletonList( |
| new SegmentDescriptor(interval, "unknown_version", 0) |
| ); |
| final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments( |
| query, |
| unknownSegments |
| ); |
| final ResponseContext responseContext = DefaultResponseContext.createEmpty(); |
| final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); |
| Assert.assertTrue(results.isEmpty()); |
| Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); |
| Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); |
| } |
| |
| @Test |
| public void testGetQueryRunnerForSegmentsWhenTimelinePartitionChunkIsMissingReportingMissingSegments() |
| { |
| final Interval interval = Intervals.of("P1d/2011-04-01"); |
| final int unknownPartitionId = 1000; |
| final SearchQuery query = searchQuery("test", interval, Granularities.ALL); |
| final List<SegmentDescriptor> unknownSegments = Collections.singletonList( |
| new SegmentDescriptor(interval, "1", unknownPartitionId) |
| ); |
| final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments( |
| query, |
| unknownSegments |
| ); |
| final ResponseContext responseContext = DefaultResponseContext.createEmpty(); |
| final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); |
| Assert.assertTrue(results.isEmpty()); |
| Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); |
| Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); |
| } |
| |
| @Test |
| public void testGetQueryRunnerForSegmentsWhenSegmentIsClosedReportingMissingSegments() |
| { |
| final Interval interval = Intervals.of("P1d/2011-04-01"); |
| final SearchQuery query = searchQuery("test", interval, Granularities.ALL); |
| final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> maybeTimeline = segmentManager |
| .getTimeline(DataSourceAnalysis.forDataSource(query.getDataSource())); |
| Assert.assertTrue(maybeTimeline.isPresent()); |
| final List<TimelineObjectHolder<String, ReferenceCountingSegment>> holders = maybeTimeline.get().lookup(interval); |
| final List<SegmentDescriptor> closedSegments = new ArrayList<>(); |
| for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : holders) { |
| for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) { |
| final ReferenceCountingSegment segment = chunk.getObject(); |
| Assert.assertNotNull(segment.getId()); |
| closedSegments.add( |
| new SegmentDescriptor(segment.getDataInterval(), segment.getVersion(), segment.getId().getPartitionNum()) |
| ); |
| segment.close(); |
| } |
| } |
| final QueryRunner<Result<SearchResultValue>> queryRunner = serverManager.getQueryRunnerForSegments( |
| query, |
| closedSegments |
| ); |
| final ResponseContext responseContext = DefaultResponseContext.createEmpty(); |
| final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); |
| Assert.assertTrue(results.isEmpty()); |
| Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); |
| Assert.assertEquals(closedSegments, responseContext.get(Key.MISSING_SEGMENTS)); |
| } |
| |
| @Test |
| public void testGetQueryRunnerForSegmentsForUnknownQueryThrowingException() |
| { |
| final Interval interval = Intervals.of("P1d/2011-04-01"); |
| final List<SegmentDescriptor> descriptors = Collections.singletonList(new SegmentDescriptor(interval, "1", 0)); |
| expectedException.expect(QueryUnsupportedException.class); |
| expectedException.expectMessage("Unknown query type"); |
| serverManager.getQueryRunnerForSegments( |
| new BaseQuery<Object>( |
| new TableDataSource("test"), |
| new MultipleSpecificSegmentSpec(descriptors), |
| false, |
| new HashMap<>() |
| ) |
| { |
| @Override |
| public boolean hasFilters() |
| { |
| return false; |
| } |
| |
| @Override |
| public DimFilter getFilter() |
| { |
| return null; |
| } |
| |
| @Override |
| public String getType() |
| { |
| return null; |
| } |
| |
| @Override |
| public Query<Object> withOverriddenContext(Map<String, Object> contextOverride) |
| { |
| return null; |
| } |
| |
| @Override |
| public Query<Object> withQuerySegmentSpec(QuerySegmentSpec spec) |
| { |
| return null; |
| } |
| |
| @Override |
| public Query<Object> withDataSource(DataSource dataSource) |
| { |
| return null; |
| } |
| }, |
| descriptors |
| ); |
| } |
| |
| private void waitForTestVerificationAndCleanup(Future future) |
| { |
| try { |
| queryNotifyLatch.await(1000, TimeUnit.MILLISECONDS); |
| queryWaitYieldLatch.countDown(); |
| queryWaitLatch.countDown(); |
| future.get(); |
| factory.clearAdapters(); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private SearchQuery searchQuery(String datasource, Interval interval, Granularity granularity) |
| { |
| return Druids.newSearchQueryBuilder() |
| .dataSource(datasource) |
| .intervals(Collections.singletonList(interval)) |
| .granularity(granularity) |
| .limit(10000) |
| .query("wow") |
| .build(); |
| } |
| |
| private Future assertQueryable( |
| Granularity granularity, |
| String dataSource, |
| Interval interval, |
| List<Pair<String, Interval>> expected |
| ) |
| { |
| final Iterator<Pair<String, Interval>> expectedIter = expected.iterator(); |
| final List<Interval> intervals = Collections.singletonList(interval); |
| final SearchQuery query = searchQuery(dataSource, interval, granularity); |
| final QueryRunner<Result<SearchResultValue>> runner = serverManager.getQueryRunnerForIntervals( |
| query, |
| intervals |
| ); |
| return serverManagerExec.submit( |
| new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| Sequence<Result<SearchResultValue>> seq = runner.run(QueryPlus.wrap(query)); |
| seq.toList(); |
| Iterator<SegmentForTesting> adaptersIter = factory.getAdapters().iterator(); |
| |
| while (expectedIter.hasNext() && adaptersIter.hasNext()) { |
| Pair<String, Interval> expectedVals = expectedIter.next(); |
| SegmentForTesting value = adaptersIter.next(); |
| |
| Assert.assertEquals(expectedVals.lhs, value.getVersion()); |
| Assert.assertEquals(expectedVals.rhs, value.getInterval()); |
| } |
| |
| Assert.assertFalse(expectedIter.hasNext()); |
| Assert.assertFalse(adaptersIter.hasNext()); |
| } |
| } |
| ); |
| } |
| |
| public void loadQueryable(String dataSource, String version, Interval interval) |
| { |
| try { |
| segmentManager.loadSegment( |
| new DataSegment( |
| dataSource, |
| interval, |
| version, |
| ImmutableMap.of("version", version, "interval", interval), |
| Arrays.asList("dim1", "dim2", "dim3"), |
| Arrays.asList("metric1", "metric2"), |
| NoneShardSpec.instance(), |
| IndexIO.CURRENT_VERSION_ID, |
| 123L |
| ), |
| false, |
| SegmentLazyLoadFailCallback.NOOP |
| ); |
| } |
| catch (SegmentLoadingException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void dropQueryable(String dataSource, String version, Interval interval) |
| { |
| segmentManager.dropSegment( |
| new DataSegment( |
| dataSource, |
| interval, |
| version, |
| ImmutableMap.of("version", version, "interval", interval), |
| Arrays.asList("dim1", "dim2", "dim3"), |
| Arrays.asList("metric1", "metric2"), |
| NoneShardSpec.instance(), |
| IndexIO.CURRENT_VERSION_ID, |
| 123L |
| ) |
| ); |
| } |
| |
| public static class MyQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery> |
| { |
| private final CountDownLatch waitLatch; |
| private final CountDownLatch waitYieldLatch; |
| private final CountDownLatch notifyLatch; |
| private List<SegmentForTesting> adapters = new ArrayList<>(); |
| private List<ReferenceCountingSegment> segmentReferences = new ArrayList<>(); |
| |
| |
| public MyQueryRunnerFactory( |
| CountDownLatch waitLatch, |
| CountDownLatch waitYieldLatch, |
| CountDownLatch notifyLatch |
| ) |
| { |
| this.waitLatch = waitLatch; |
| this.waitYieldLatch = waitYieldLatch; |
| this.notifyLatch = notifyLatch; |
| } |
| |
| @Override |
| public QueryRunner<Result<SearchResultValue>> createRunner(Segment adapter) |
| { |
| if (!(adapter instanceof ReferenceCountingSegment)) { |
| throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass()); |
| } |
| final ReferenceCountingSegment segment = (ReferenceCountingSegment) adapter; |
| |
| Assert.assertTrue(segment.getNumReferences() > 0); |
| segmentReferences.add(segment); |
| adapters.add((SegmentForTesting) segment.getBaseSegment()); |
| return new BlockingQueryRunner<>(new NoopQueryRunner<>(), waitLatch, waitYieldLatch, notifyLatch); |
| } |
| |
| @Override |
| public QueryRunner<Result<SearchResultValue>> mergeRunners( |
| QueryProcessingPool queryProcessingPool, |
| Iterable<QueryRunner<Result<SearchResultValue>>> queryRunners |
| ) |
| { |
| return new ConcatQueryRunner<>(Sequences.simple(queryRunners)); |
| } |
| |
| @Override |
| public QueryToolChest<Result<SearchResultValue>, SearchQuery> getToolchest() |
| { |
| return new NoopQueryToolChest<>(); |
| } |
| |
| public List<SegmentForTesting> getAdapters() |
| { |
| return adapters; |
| } |
| |
| public List<ReferenceCountingSegment> getSegmentReferences() |
| { |
| return segmentReferences; |
| } |
| |
| public void clearAdapters() |
| { |
| adapters.clear(); |
| } |
| } |
| |
| public static class NoopQueryToolChest<T, QueryType extends Query<T>> extends QueryToolChest<T, QueryType> |
| { |
| @Override |
| public QueryRunner<T> mergeResults(QueryRunner<T> runner) |
| { |
| return runner; |
| } |
| |
| @Override |
| public QueryMetrics<Query<?>> makeMetrics(QueryType query) |
| { |
| return new DefaultQueryMetrics<>(); |
| } |
| |
| @Override |
| public Function<T, T> makePreComputeManipulatorFn(QueryType query, MetricManipulationFn fn) |
| { |
| return Functions.identity(); |
| } |
| |
| @Override |
| public TypeReference<T> getResultTypeReference() |
| { |
| return new TypeReference<T>() |
| { |
| }; |
| } |
| } |
| |
| private static class SegmentForTesting implements Segment |
| { |
| private final String version; |
| private final Interval interval; |
| private final Object lock = new Object(); |
| private volatile boolean closed = false; |
| |
| SegmentForTesting( |
| String version, |
| Interval interval |
| ) |
| { |
| this.version = version; |
| this.interval = interval; |
| } |
| |
| public String getVersion() |
| { |
| return version; |
| } |
| |
| public Interval getInterval() |
| { |
| return interval; |
| } |
| |
| @Override |
| public SegmentId getId() |
| { |
| return SegmentId.dummy(version); |
| } |
| |
| public boolean isClosed() |
| { |
| return closed; |
| } |
| |
| @Override |
| public Interval getDataInterval() |
| { |
| return interval; |
| } |
| |
| @Override |
| public QueryableIndex asQueryableIndex() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public StorageAdapter asStorageAdapter() |
| { |
| return makeFakeStorageAdapter(interval, 0); |
| } |
| |
| @Override |
| public void close() |
| { |
| synchronized (lock) { |
| closed = true; |
| } |
| } |
| |
| private StorageAdapter makeFakeStorageAdapter(Interval interval, int cardinality) |
| { |
| StorageAdapter adapter = new StorageAdapter() |
| { |
| @Override |
| public Interval getInterval() |
| { |
| return interval; |
| } |
| |
| @Override |
| public int getDimensionCardinality(String column) |
| { |
| return cardinality; |
| } |
| |
| @Override |
| public DateTime getMinTime() |
| { |
| return interval.getStart(); |
| } |
| |
| |
| @Override |
| public DateTime getMaxTime() |
| { |
| return interval.getEnd(); |
| } |
| |
| // stubs below this line not important for tests |
| |
| @Override |
| public Indexed<String> getAvailableDimensions() |
| { |
| return null; |
| } |
| |
| @Override |
| public Iterable<String> getAvailableMetrics() |
| { |
| return null; |
| } |
| |
| @Nullable |
| @Override |
| public Comparable getMinValue(String column) |
| { |
| return null; |
| } |
| |
| @Nullable |
| @Override |
| public Comparable getMaxValue(String column) |
| { |
| return null; |
| } |
| |
| @Nullable |
| @Override |
| public ColumnCapabilities getColumnCapabilities(String column) |
| { |
| return null; |
| } |
| |
| @Nullable |
| @Override |
| public String getColumnTypeName(String column) |
| { |
| return null; |
| } |
| |
| @Override |
| public int getNumRows() |
| { |
| return 0; |
| } |
| |
| @Override |
| public DateTime getMaxIngestedEventTime() |
| { |
| return null; |
| } |
| |
| @Override |
| public Metadata getMetadata() |
| { |
| return null; |
| } |
| |
| @Override |
| public Sequence<Cursor> makeCursors( |
| @Nullable Filter filter, |
| Interval interval, |
| VirtualColumns virtualColumns, |
| Granularity gran, |
| boolean descending, |
| @Nullable QueryMetrics<?> queryMetrics |
| ) |
| { |
| return null; |
| } |
| }; |
| |
| return adapter; |
| } |
| } |
| |
| private static class BlockingQueryRunner<T> implements QueryRunner<T> |
| { |
| private final QueryRunner<T> runner; |
| private final CountDownLatch waitLatch; |
| private final CountDownLatch waitYieldLatch; |
| private final CountDownLatch notifyLatch; |
| |
| public BlockingQueryRunner( |
| QueryRunner<T> runner, |
| CountDownLatch waitLatch, |
| CountDownLatch waitYieldLatch, |
| CountDownLatch notifyLatch |
| ) |
| { |
| this.runner = runner; |
| this.waitLatch = waitLatch; |
| this.waitYieldLatch = waitYieldLatch; |
| this.notifyLatch = notifyLatch; |
| } |
| |
| @Override |
| public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) |
| { |
| return new BlockingSequence<>(runner.run(queryPlus, responseContext), waitLatch, waitYieldLatch, notifyLatch); |
| } |
| } |
| |
| private static class BlockingSequence<T> extends YieldingSequenceBase<T> |
| { |
| private final Sequence<T> baseSequence; |
| private final CountDownLatch waitLatch; |
| private final CountDownLatch waitYieldLatch; |
| private final CountDownLatch notifyLatch; |
| |
| |
| private BlockingSequence( |
| Sequence<T> baseSequence, |
| CountDownLatch waitLatch, |
| CountDownLatch waitYieldLatch, |
| CountDownLatch notifyLatch |
| ) |
| { |
| this.baseSequence = baseSequence; |
| this.waitLatch = waitLatch; |
| this.waitYieldLatch = waitYieldLatch; |
| this.notifyLatch = notifyLatch; |
| } |
| |
| @Override |
| public <OutType> Yielder<OutType> toYielder( |
| final OutType initValue, |
| final YieldingAccumulator<OutType, T> accumulator |
| ) |
| { |
| notifyLatch.countDown(); |
| |
| try { |
| waitYieldLatch.await(1000, TimeUnit.MILLISECONDS); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| |
| final Yielder<OutType> baseYielder = baseSequence.toYielder(initValue, accumulator); |
| return new Yielder<OutType>() |
| { |
| @Override |
| public OutType get() |
| { |
| try { |
| waitLatch.await(1000, TimeUnit.MILLISECONDS); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| return baseYielder.get(); |
| } |
| |
| @Override |
| public Yielder<OutType> next(OutType initValue) |
| { |
| return baseYielder.next(initValue); |
| } |
| |
| @Override |
| public boolean isDone() |
| { |
| return baseYielder.isDone(); |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| baseYielder.close(); |
| } |
| }; |
| } |
| } |
| } |