| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.client; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.dataformat.smile.SmileFactory; |
| import com.fasterxml.jackson.dataformat.smile.SmileGenerator; |
| import com.google.common.base.Predicates; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; |
| import org.apache.druid.client.selector.RandomServerSelectorStrategy; |
| import org.apache.druid.client.selector.ServerSelector; |
| import org.apache.druid.curator.CuratorTestBase; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.http.client.HttpClient; |
| import org.apache.druid.query.QueryToolChestWarehouse; |
| import org.apache.druid.query.QueryWatcher; |
| import org.apache.druid.query.TableDataSource; |
| import org.apache.druid.query.planning.DataSourceAnalysis; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.server.coordination.DruidServerMetadata; |
| import org.apache.druid.server.coordination.ServerType; |
| import org.apache.druid.server.initialization.ZkPathsConfig; |
| import org.apache.druid.server.metrics.NoopServiceEmitter; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.TimelineLookup; |
| import org.apache.druid.timeline.TimelineObjectHolder; |
| import org.apache.druid.timeline.partition.NoneShardSpec; |
| import org.apache.druid.timeline.partition.PartitionHolder; |
| import org.apache.druid.timeline.partition.SingleElementPartitionChunk; |
| import org.easymock.EasyMock; |
| import org.joda.time.Interval; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| |
| public class BrokerServerViewTest extends CuratorTestBase |
| { |
| private final ObjectMapper jsonMapper; |
| private final ZkPathsConfig zkPathsConfig; |
| |
| private CountDownLatch segmentViewInitLatch; |
| private CountDownLatch segmentAddedLatch; |
| private CountDownLatch segmentRemovedLatch; |
| |
| private BatchServerInventoryView baseView; |
| private BrokerServerView brokerServerView; |
| |
| public BrokerServerViewTest() |
| { |
| jsonMapper = TestHelper.makeJsonMapper(); |
| zkPathsConfig = new ZkPathsConfig(); |
| } |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| setupServerAndCurator(); |
| curator.start(); |
| curator.blockUntilConnected(); |
| } |
| |
| @Test |
| public void testSingleServerAddedRemovedSegment() throws Exception |
| { |
| segmentViewInitLatch = new CountDownLatch(1); |
| segmentAddedLatch = new CountDownLatch(1); |
| segmentRemovedLatch = new CountDownLatch(1); |
| |
| setupViews(); |
| |
| final DruidServer druidServer = setupHistoricalServer("default_tier", "localhost:1234", 0); |
| final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1"); |
| final int partition = segment.getShardSpec().getPartitionNum(); |
| final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D"); |
| announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); |
| |
| TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline( |
| DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) |
| ).get(); |
| List<TimelineObjectHolder<String, ServerSelector>> serverLookupRes = timeline.lookup(intervals); |
| Assert.assertEquals(1, serverLookupRes.size()); |
| |
| TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = serverLookupRes.get(0); |
| Assert.assertEquals(intervals, actualTimelineObjectHolder.getInterval()); |
| Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion()); |
| |
| PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject(); |
| Assert.assertTrue(actualPartitionHolder.isComplete()); |
| Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); |
| |
| ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject(); |
| Assert.assertFalse(selector.isEmpty()); |
| Assert.assertEquals(segment, selector.getSegment()); |
| Assert.assertEquals(druidServer, selector.pick(null).getServer()); |
| Assert.assertNotNull(timeline.findChunk(intervals, "v1", partition)); |
| |
| unannounceSegmentForServer(druidServer, segment, zkPathsConfig); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); |
| |
| Assert.assertEquals( |
| 0, |
| timeline.lookup(intervals).size() |
| ); |
| Assert.assertNull(timeline.findChunk(intervals, "v1", partition)); |
| } |
| |
| @Test |
| public void testMultipleServerAddedRemovedSegment() throws Exception |
| { |
| segmentViewInitLatch = new CountDownLatch(1); |
| segmentAddedLatch = new CountDownLatch(5); |
| |
| // temporarily set latch count to 1 |
| segmentRemovedLatch = new CountDownLatch(1); |
| |
| setupViews(); |
| |
| final List<DruidServer> druidServers = Lists.transform( |
| ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), |
| hostname -> setupHistoricalServer("default_tier", hostname, 0) |
| ); |
| |
| final List<DataSegment> segments = Lists.transform( |
| ImmutableList.of( |
| Pair.of("2011-04-01/2011-04-03", "v1"), |
| Pair.of("2011-04-03/2011-04-06", "v1"), |
| Pair.of("2011-04-01/2011-04-09", "v2"), |
| Pair.of("2011-04-06/2011-04-09", "v3"), |
| Pair.of("2011-04-01/2011-04-02", "v3") |
| ), input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs) |
| ); |
| |
| for (int i = 0; i < 5; ++i) { |
| announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); |
| } |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); |
| |
| TimelineLookup timeline = brokerServerView.getTimeline( |
| DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) |
| ).get(); |
| assertValues( |
| Arrays.asList( |
| createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), |
| createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), |
| createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) |
| ), |
| (List<TimelineObjectHolder>) timeline.lookup( |
| Intervals.of( |
| "2011-04-01/2011-04-09" |
| ) |
| ) |
| ); |
| |
| // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") |
| unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); |
| |
| // renew segmentRemovedLatch since we still have 4 segments to unannounce |
| segmentRemovedLatch = new CountDownLatch(4); |
| |
| timeline = brokerServerView.getTimeline( |
| DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) |
| ).get(); |
| assertValues( |
| Arrays.asList( |
| createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), |
| createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)), |
| createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)), |
| createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) |
| ), |
| (List<TimelineObjectHolder>) timeline.lookup( |
| Intervals.of( |
| "2011-04-01/2011-04-09" |
| ) |
| ) |
| ); |
| |
| // unannounce all the segments |
| for (int i = 0; i < 5; ++i) { |
| // skip the one that was previously unannounced |
| if (i != 2) { |
| unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig); |
| } |
| } |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); |
| |
| Assert.assertEquals( |
| 0, |
| ((List<TimelineObjectHolder>) timeline.lookup(Intervals.of("2011-04-01/2011-04-09"))).size() |
| ); |
| } |
| |
| @Test |
| public void testMultipleServerAndBroker() throws Exception |
| { |
| segmentViewInitLatch = new CountDownLatch(1); |
| segmentAddedLatch = new CountDownLatch(6); |
| |
| // temporarily set latch count to 1 |
| segmentRemovedLatch = new CountDownLatch(1); |
| |
| setupViews(); |
| |
| final DruidServer druidBroker = new DruidServer( |
| "localhost:5", |
| "localhost:5", |
| null, |
| 10000000L, |
| ServerType.BROKER, |
| "default_tier", |
| 0 |
| ); |
| |
| final List<DruidServer> druidServers = Lists.transform( |
| ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), |
| hostname -> setupHistoricalServer("default_tier", hostname, 0) |
| ); |
| |
| setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper); |
| |
| final List<DataSegment> segments = Lists.transform( |
| ImmutableList.of( |
| Pair.of("2011-04-01/2011-04-03", "v1"), |
| Pair.of("2011-04-03/2011-04-06", "v1"), |
| Pair.of("2011-04-01/2011-04-09", "v2"), |
| Pair.of("2011-04-06/2011-04-09", "v3"), |
| Pair.of("2011-04-01/2011-04-02", "v3") |
| ), |
| input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs) |
| ); |
| |
| DataSegment brokerSegment = dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-11", "v4"); |
| announceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig, jsonMapper); |
| for (int i = 0; i < 5; ++i) { |
| announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); |
| } |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); |
| |
| TimelineLookup timeline = brokerServerView.getTimeline( |
| DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) |
| ).get(); |
| |
| assertValues( |
| Arrays.asList( |
| createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), |
| createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), |
| createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) |
| ), |
| (List<TimelineObjectHolder>) timeline.lookup( |
| Intervals.of( |
| "2011-04-01/2011-04-09" |
| ) |
| ) |
| ); |
| |
| // unannounce the broker segment should do nothing to announcements |
| unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); |
| |
| // renew segmentRemovedLatch since we still have 5 segments to unannounce |
| segmentRemovedLatch = new CountDownLatch(5); |
| |
| timeline = brokerServerView.getTimeline( |
| DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) |
| ).get(); |
| |
| // expect same set of segments as before |
| assertValues( |
| Arrays.asList( |
| createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), |
| createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), |
| createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) |
| ), |
| (List<TimelineObjectHolder>) timeline.lookup( |
| Intervals.of( |
| "2011-04-01/2011-04-09" |
| ) |
| ) |
| ); |
| |
| // unannounce all the segments |
| for (int i = 0; i < 5; ++i) { |
| unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig); |
| } |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); |
| } |
| |
| @Test |
| public void testMultipleTiers() throws Exception |
| { |
| segmentViewInitLatch = new CountDownLatch(1); |
| segmentAddedLatch = new CountDownLatch(4); |
| segmentRemovedLatch = new CountDownLatch(0); |
| |
| // Setup a Broker that watches only Tier 2 |
| final String tier1 = "tier1"; |
| final String tier2 = "tier2"; |
| setupViews(Sets.newHashSet(tier2)); |
| |
| // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 |
| final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); |
| final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1); |
| |
| final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1"); |
| announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper); |
| |
| final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1"); |
| announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper); |
| announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper); |
| |
| final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1"); |
| announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper); |
| |
| // Wait for the segments to be added |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); |
| Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); |
| |
| // Get the timeline for the datasource |
| TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline( |
| DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource())) |
| ).get(); |
| |
| // Verify that the timeline has no entry for the interval of segment 1 |
| Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty()); |
| |
| // Verify that there is one entry for the interval of segment 2 |
| List<TimelineObjectHolder<String, ServerSelector>> timelineHolders = |
| timeline.lookup(segment2.getInterval()); |
| Assert.assertEquals(1, timelineHolders.size()); |
| |
| TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0); |
| Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval()); |
| Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion()); |
| |
| PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject(); |
| Assert.assertTrue(partitionHolder.isComplete()); |
| Assert.assertEquals(1, Iterables.size(partitionHolder)); |
| |
| ServerSelector selector = (partitionHolder.iterator().next()).getObject(); |
| Assert.assertFalse(selector.isEmpty()); |
| Assert.assertEquals(segment2, selector.getSegment()); |
| |
| // Verify that the ServerSelector always picks Tier 1 |
| for (int i = 0; i < 5; ++i) { |
| Assert.assertEquals(server21, selector.pick(null).getServer()); |
| } |
| Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); |
| } |
| |
| /** |
| * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it. |
| */ |
| private DruidServer setupHistoricalServer(String tier, String name, int priority) |
| { |
| final DruidServer historical = new DruidServer( |
| name, |
| name, |
| null, |
| 1000000, |
| ServerType.HISTORICAL, |
| tier, |
| priority |
| ); |
| setupZNodeForServer(historical, zkPathsConfig, jsonMapper); |
| return historical; |
| } |
| |
| private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected( |
| String intervalStr, |
| String version, |
| DruidServer druidServer, |
| DataSegment segment |
| ) |
| { |
| return Pair.of(Intervals.of(intervalStr), Pair.of(version, Pair.of(druidServer, segment))); |
| } |
| |
| private void assertValues( |
| List<Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>> expected, List<TimelineObjectHolder> actual |
| ) |
| { |
| Assert.assertEquals(expected.size(), actual.size()); |
| |
| for (int i = 0; i < expected.size(); ++i) { |
| Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i); |
| TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = actual.get(i); |
| |
| Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval()); |
| Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion()); |
| |
| PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject(); |
| Assert.assertTrue(actualPartitionHolder.isComplete()); |
| Assert.assertEquals(1, Iterables.size(actualPartitionHolder)); |
| |
| ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator() |
| .next()).getObject(); |
| Assert.assertFalse(selector.isEmpty()); |
| Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick(null).getServer()); |
| Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment()); |
| } |
| } |
| |
| private void setupViews() throws Exception |
| { |
| setupViews(null); |
| } |
| |
| private void setupViews(Set<String> watchedTiers) throws Exception |
| { |
| baseView = new BatchServerInventoryView( |
| zkPathsConfig, |
| curator, |
| jsonMapper, |
| Predicates.alwaysTrue() |
| ) |
| { |
| @Override |
| public void registerSegmentCallback(Executor exec, final SegmentCallback callback) |
| { |
| super.registerSegmentCallback( |
| exec, |
| new SegmentCallback() |
| { |
| @Override |
| public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) |
| { |
| CallbackAction res = callback.segmentAdded(server, segment); |
| segmentAddedLatch.countDown(); |
| return res; |
| } |
| |
| @Override |
| public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) |
| { |
| CallbackAction res = callback.segmentRemoved(server, segment); |
| segmentRemovedLatch.countDown(); |
| return res; |
| } |
| |
| @Override |
| public CallbackAction segmentViewInitialized() |
| { |
| CallbackAction res = callback.segmentViewInitialized(); |
| segmentViewInitLatch.countDown(); |
| return res; |
| } |
| } |
| ); |
| } |
| }; |
| |
| brokerServerView = new BrokerServerView( |
| EasyMock.createMock(QueryToolChestWarehouse.class), |
| EasyMock.createMock(QueryWatcher.class), |
| getSmileMapper(), |
| EasyMock.createMock(HttpClient.class), |
| baseView, |
| new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), |
| new NoopServiceEmitter(), |
| new BrokerSegmentWatcherConfig() |
| { |
| @Override |
| public Set<String> getWatchedTiers() |
| { |
| return watchedTiers; |
| } |
| } |
| ); |
| |
| baseView.start(); |
| } |
| |
| private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) |
| { |
| return DataSegment.builder() |
| .dataSource("test_broker_server_view") |
| .interval(Intervals.of(intervalStr)) |
| .loadSpec( |
| ImmutableMap.of( |
| "type", |
| "local", |
| "path", |
| "somewhere" |
| ) |
| ) |
| .version(version) |
| .dimensions(ImmutableList.of()) |
| .metrics(ImmutableList.of()) |
| .shardSpec(NoneShardSpec.instance()) |
| .binaryVersion(9) |
| .size(0) |
| .build(); |
| } |
| |
| public ObjectMapper getSmileMapper() |
| { |
| final SmileFactory smileFactory = new SmileFactory(); |
| smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false); |
| smileFactory.delegateToTextual(true); |
| final ObjectMapper retVal = new DefaultObjectMapper(smileFactory); |
| retVal.getFactory().setCodec(retVal); |
| return retVal; |
| } |
| |
| @After |
| public void tearDown() throws Exception |
| { |
| baseView.stop(); |
| tearDownServerAndCurator(); |
| } |
| } |