Merge pull request #1444 from druid-io/logging-improvement
Separate bootstrap threads from loading threads on historical startup
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
index 1202ab5..6f87db2 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
@@ -51,7 +51,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -466,7 +465,6 @@
config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
segment,
descriptorPath,
- FileContext.getFileContext(descriptorPath.toUri(), context.getConfiguration()),
context
);
for (File file : toMerge) {
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
index 688bd80..22f1f60 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
@@ -23,19 +23,19 @@
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
+import com.metamx.common.FileUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
+import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
@@ -45,6 +45,7 @@
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
@@ -56,9 +57,9 @@
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
-import java.util.EnumSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry;
@@ -75,6 +76,7 @@
private static final int NUM_RETRIES = 8;
private static final int SECONDS_BETWEEN_RETRIES = 2;
+ private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
public static Path distributedClassPath(String path)
{
@@ -222,7 +224,6 @@
throws IOException
{
final FileSystem outputFS = FileSystem.get(segmentBasePath.toUri(), configuration);
- final FileContext fileContext = FileContext.getFileContext(segmentBasePath.toUri(), configuration);
final Path tmpPath = new Path(segmentBasePath, String.format("index.zip.%d", taskAttemptID.getId()));
final AtomicLong size = new AtomicLong(0L);
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
@@ -231,11 +232,11 @@
@Override
public long push() throws IOException
{
- try (OutputStream outputStream = fileContext.create(
+ try (OutputStream outputStream = outputFS.create(
tmpPath,
- EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE),
- Options.CreateOpts.createParent(),
- Options.CreateOpts.bufferSize(256 * 1024)
+ true,
+ DEFAULT_FS_BUFFER_SIZE,
+ progressable
)) {
size.set(zipAndCopyDir(mergedBase, outputStream, progressable));
outputStream.flush();
@@ -284,12 +285,20 @@
.withLoadSpec(loadSpec)
.withSize(size.get())
.withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
- fileContext.rename(tmpPath, finalIndexZipFilePath, Options.Rename.OVERWRITE);
+
+ if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) {
+ throw new IOException(
+ String.format(
+ "Unable to rename [%s] to [%s]",
+ tmpPath.toUri().toString(),
+ finalIndexZipFilePath.toUri().toString()
+ )
+ );
+ }
writeSegmentDescriptor(
outputFS,
finalSegment,
new Path(segmentBasePath, "descriptor.json"),
- fileContext,
progressable
);
return finalSegment;
@@ -299,7 +308,6 @@
final FileSystem outputFS,
final DataSegment segment,
final Path descriptorPath,
- final FileContext fileContext,
final Progressable progressable
)
throws IOException
@@ -313,22 +321,22 @@
try {
progressable.progress();
if (outputFS.exists(descriptorPath)) {
- if (!fileContext.delete(descriptorPath, false)) {
+ if (!outputFS.delete(descriptorPath, false)) {
throw new IOException(String.format("Failed to delete descriptor at [%s]", descriptorPath));
}
}
- try (final OutputStream descriptorOut = fileContext.create(
+ try (final OutputStream descriptorOut = outputFS.create(
descriptorPath,
- EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE),
- Options.CreateOpts.bufferSize(256 * 1024),
- Options.CreateOpts.createParent()
+ true,
+ DEFAULT_FS_BUFFER_SIZE,
+ progressable
)) {
HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
descriptorOut.flush();
}
}
catch (RuntimeException | IOException ex) {
- log.info(ex, "Error in retry loop");
+ log.info(ex, "Exception in descriptor pusher retry loop");
throw ex;
}
return -1;
@@ -433,6 +441,80 @@
return outputPath;
}
+ /**
+ * Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
+ * which will not overwrite
+ *
+ * @param outputFS The output fs
+ * @param indexZipFilePath The original file path
+ * @param finalIndexZipFilePath The to rename the original file to
+ *
+ * @return False if a rename failed, true otherwise (rename success or no rename needed)
+ */
+ private static boolean renameIndexFiles(
+ final FileSystem outputFS,
+ final Path indexZipFilePath,
+ final Path finalIndexZipFilePath
+ )
+ {
+ try {
+ return RetryUtils.retry(
+ new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ final boolean needRename;
+
+ if (outputFS.exists(finalIndexZipFilePath)) {
+ // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first
+ final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
+ final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
+
+ if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
+ || zipFile.getLen() != finalIndexZipFile.getLen()) {
+ log.info(
+ "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
+ finalIndexZipFile.getPath(),
+ new DateTime(finalIndexZipFile.getModificationTime()),
+ finalIndexZipFile.getLen(),
+ zipFile.getPath(),
+ new DateTime(zipFile.getModificationTime()),
+ zipFile.getLen()
+ );
+ outputFS.delete(finalIndexZipFilePath, false);
+ needRename = true;
+ } else {
+ log.info(
+ "File[%s / %s / %sB] existed and will be kept",
+ finalIndexZipFile.getPath(),
+ new DateTime(finalIndexZipFile.getModificationTime()),
+ finalIndexZipFile.getLen()
+ );
+ needRename = false;
+ }
+ } else {
+ needRename = true;
+ }
+
+ if (needRename) {
+ log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
+ return outputFS.rename(indexZipFilePath, finalIndexZipFilePath);
+ } else {
+ return true;
+ }
+ }
+ },
+ FileUtils.IS_EXCEPTION,
+ NUM_RETRIES
+ );
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+
public static Path prependFSIfNullScheme(FileSystem fs, Path path)
{
if (path.toUri().getScheme() == null) {
@@ -455,38 +537,40 @@
@Override
public long push() throws IOException
{
- final FileContext context = FileContext.getFileContext(zip.toUri(), configuration);
- long size = 0L;
- final byte[] buffer = new byte[1 << 13];
- progressable.progress();
- try (ZipInputStream in = new ZipInputStream(context.open(zip, 1 << 13))) {
- for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
- final String fileName = entry.getName();
- try (final OutputStream out = new BufferedOutputStream(
- new FileOutputStream(
- outDir.getAbsolutePath()
- + File.separator
- + fileName
- ), 1 << 13
- )) {
- for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
- progressable.progress();
- if (len == 0) {
- continue;
+ try {
+ final FileSystem fileSystem = zip.getFileSystem(configuration);
+ long size = 0L;
+ final byte[] buffer = new byte[1 << 13];
+ progressable.progress();
+ try (ZipInputStream in = new ZipInputStream(fileSystem.open(zip, 1 << 13))) {
+ for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
+ final String fileName = entry.getName();
+ try (final OutputStream out = new BufferedOutputStream(
+ new FileOutputStream(
+ outDir.getAbsolutePath()
+ + File.separator
+ + fileName
+ ), 1 << 13
+ )) {
+ for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
+ progressable.progress();
+ if (len == 0) {
+ continue;
+ }
+ size += len;
+ out.write(buffer, 0, len);
}
- size += len;
- out.write(buffer, 0, len);
+ out.flush();
}
- out.flush();
}
}
+ progressable.progress();
+ return size;
}
catch (IOException | RuntimeException exception) {
- log.error(exception, "Exception in retry loop");
+ log.error(exception, "Exception in unzip retry loop");
throw exception;
}
- progressable.progress();
- return size;
}
},
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java
index 884ce1a..91e56ce 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java
@@ -65,6 +65,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -451,6 +452,7 @@
}
@Test
+ @Ignore // This takes a long time due to retries
public void testHadoopFailure() throws IOException, InterruptedException
{
final SQLMetadataSegmentManager manager = new SQLMetadataSegmentManager(
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
index 6b1e2d0..8f4d527 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
@@ -41,6 +41,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.Random;
public class RemoteTaskActionClient implements TaskActionClient
{
@@ -49,6 +50,7 @@
private final ServerDiscoverySelector selector;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
+ private final Random random = new Random();
private static final Logger log = new Logger(RemoteTaskActionClient.class);
@@ -133,7 +135,7 @@
throw e;
} else {
try {
- final long sleepTime = delay.getMillis();
+ final long sleepTime = jitter(delay.getMillis());
log.info("Will try again in [%s].", new Duration(sleepTime).toString());
Thread.sleep(sleepTime);
}
@@ -145,6 +147,12 @@
}
}
+ private long jitter(long input){
+ final double jitter = random.nextGaussian() * input / 4.0;
+ long retval = input + (long)jitter;
+ return retval < 0 ? 0 : retval;
+ }
+
private URI makeServiceUri(final Server instance) throws URISyntaxException
{
return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java
new file mode 100644
index 0000000..0a9c8c0
--- /dev/null
+++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.Pair;
+import com.metamx.http.client.HttpClient;
+import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
+import io.druid.client.selector.RandomServerSelectorStrategy;
+import io.druid.client.selector.ServerSelector;
+import io.druid.curator.CuratorTestBase;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.QueryToolChestWarehouse;
+import io.druid.query.QueryWatcher;
+import io.druid.query.TableDataSource;
+import io.druid.server.coordination.DruidServerMetadata;
+import io.druid.server.initialization.ZkPathsConfig;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.TimelineLookup;
+import io.druid.timeline.TimelineObjectHolder;
+import io.druid.timeline.partition.NoneShardSpec;
+import io.druid.timeline.partition.PartitionHolder;
+import io.druid.timeline.partition.SingleElementPartitionChunk;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+public class BrokerServerViewTest extends CuratorTestBase
+{
+ private final ObjectMapper jsonMapper;
+ private final ZkPathsConfig zkPathsConfig;
+ private final String announcementsPath;
+ private final String inventoryPath;
+
+ private CountDownLatch segmentViewInitLatch;
+ private CountDownLatch segmentAddedLatch;
+ private CountDownLatch segmentRemovedLatch;
+
+ private ServerInventoryView baseView;
+ private BrokerServerView brokerServerView;
+
+ public BrokerServerViewTest()
+ {
+ jsonMapper = new DefaultObjectMapper();
+ zkPathsConfig = new ZkPathsConfig();
+ announcementsPath = zkPathsConfig.getAnnouncementsPath();
+ inventoryPath = zkPathsConfig.getLiveSegmentsPath();
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ setupServerAndCurator();
+ curator.start();
+ }
+
+ @Test
+ public void testSingleServerAddedRemovedSegment() throws Exception
+ {
+ segmentViewInitLatch = new CountDownLatch(1);
+ segmentAddedLatch = new CountDownLatch(1);
+ segmentRemovedLatch = new CountDownLatch(1);
+
+ setupViews();
+
+ final DruidServer druidServer = new DruidServer(
+ "localhost:1234",
+ "localhost:1234",
+ 10000000L,
+ "historical",
+ "default_tier",
+ 0
+ );
+
+ setupZNodeForServer(druidServer);
+
+ final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
+ announceSegmentForServer(druidServer, segment);
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+ TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view"));
+ List<TimelineObjectHolder> serverLookupRes = (List<TimelineObjectHolder>) timeline.lookup(
+ new Interval(
+ "2014-10-20T00:00:00Z/P1D"
+ )
+ );
+ Assert.assertEquals(1, serverLookupRes.size());
+
+ TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = serverLookupRes.get(0);
+ Assert.assertEquals(new Interval("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval());
+ Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion());
+
+ PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject();
+ Assert.assertTrue(actualPartitionHolder.isComplete());
+ Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
+
+ ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
+ .next()).getObject();
+ Assert.assertFalse(selector.isEmpty());
+ Assert.assertEquals(segment, selector.getSegment());
+ Assert.assertEquals(druidServer, selector.pick().getServer());
+
+ unannounceSegmentForServer(druidServer, segment);
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+ Assert.assertEquals(
+ 0,
+ ((List<TimelineObjectHolder>) timeline.lookup(new Interval("2014-10-20T00:00:00Z/P1D"))).size()
+ );
+ Assert.assertNull(timeline.findEntry(new Interval("2014-10-20T00:00:00Z/P1D"), "v1"));
+ }
+
+ @Test
+ public void testMultipleServerAddedRemovedSegment() throws Exception
+ {
+ segmentViewInitLatch = new CountDownLatch(1);
+ segmentAddedLatch = new CountDownLatch(5);
+
+ // temporarily set latch count to 1
+ segmentRemovedLatch = new CountDownLatch(1);
+
+ setupViews();
+
+ final List<DruidServer> druidServers = Lists.transform(
+ ImmutableList.<String>of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
+ new Function<String, DruidServer>()
+ {
+ @Override
+ public DruidServer apply(String input)
+ {
+ return new DruidServer(
+ input,
+ input,
+ 10000000L,
+ "historical",
+ "default_tier",
+ 0
+ );
+ }
+ }
+ );
+
+ for (DruidServer druidServer : druidServers) {
+ setupZNodeForServer(druidServer);
+ }
+
+ final List<DataSegment> segments = Lists.transform(
+ ImmutableList.<Pair<String, String>>of(
+ Pair.of("2011-04-01/2011-04-03", "v1"),
+ Pair.of("2011-04-03/2011-04-06", "v1"),
+ Pair.of("2011-04-01/2011-04-09", "v2"),
+ Pair.of("2011-04-06/2011-04-09", "v3"),
+ Pair.of("2011-04-01/2011-04-02", "v3")
+ ), new Function<Pair<String, String>, DataSegment>()
+ {
+ @Override
+ public DataSegment apply(Pair<String, String> input)
+ {
+ return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
+ }
+ }
+ );
+
+ for (int i = 0; i < 5; ++i) {
+ announceSegmentForServer(druidServers.get(i), segments.get(i));
+ }
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+ TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view"));
+ assertValues(
+ Arrays.asList(
+ createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
+ createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
+ createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
+ ),
+ (List<TimelineObjectHolder>) timeline.lookup(
+ new Interval(
+ "2011-04-01/2011-04-09"
+ )
+ )
+ );
+
+ // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2")
+ unannounceSegmentForServer(druidServers.get(2), segments.get(2));
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+ // renew segmentRemovedLatch since we still have 4 segments to unannounce
+ segmentRemovedLatch = new CountDownLatch(4);
+
+ timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view"));
+ assertValues(
+ Arrays.asList(
+ createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
+ createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)),
+ createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)),
+ createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
+ ),
+ (List<TimelineObjectHolder>) timeline.lookup(
+ new Interval(
+ "2011-04-01/2011-04-09"
+ )
+ )
+ );
+
+ // unannounce all the segments
+ for (int i = 0; i < 5; ++i) {
+ // skip the one that was previously unannounced
+ if (i != 2) {
+ unannounceSegmentForServer(druidServers.get(i), segments.get(i));
+ }
+ }
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+ Assert.assertEquals(
+ 0,
+ ((List<TimelineObjectHolder>) timeline.lookup(new Interval("2011-04-01/2011-04-09"))).size()
+ );
+ }
+
+ private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
+ {
+ curator.create()
+ .compressed()
+ .withMode(CreateMode.EPHEMERAL)
+ .forPath(
+ ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
+ jsonMapper.writeValueAsBytes(
+ ImmutableSet.<DataSegment>of(segment)
+ )
+ );
+ }
+
+ private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
+ {
+ curator.delete().guaranteed().forPath(
+ ZKPaths.makePath(
+ ZKPaths.makePath(inventoryPath, druidServer.getHost()),
+ segment.getIdentifier()
+ )
+ );
+ }
+
+ private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
+ String intervalStr,
+ String version,
+ DruidServer druidServer,
+ DataSegment segment
+ )
+ {
+ return Pair.of(new Interval(intervalStr), Pair.of(version, Pair.of(druidServer, segment)));
+ }
+
+ private void assertValues(
+ List<Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>> expected, List<TimelineObjectHolder> actual
+ )
+ {
+ Assert.assertEquals(expected.size(), actual.size());
+
+ for (int i = 0; i < expected.size(); ++i) {
+ Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
+ TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = actual.get(i);
+
+ Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval());
+ Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion());
+
+ PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject();
+ Assert.assertTrue(actualPartitionHolder.isComplete());
+ Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
+
+ ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
+ .next()).getObject();
+ Assert.assertFalse(selector.isEmpty());
+ Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick().getServer());
+ Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment());
+ }
+ }
+
+ private void setupViews() throws Exception
+ {
+ baseView = new BatchServerInventoryView(
+ zkPathsConfig,
+ curator,
+ jsonMapper,
+ Predicates.<DataSegment>alwaysTrue()
+ )
+ {
+ @Override
+ public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
+ {
+ super.registerSegmentCallback(
+ exec, new SegmentCallback()
+ {
+ @Override
+ public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
+ {
+ CallbackAction res = callback.segmentAdded(server, segment);
+ segmentAddedLatch.countDown();
+ return res;
+ }
+
+ @Override
+ public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
+ {
+ CallbackAction res = callback.segmentRemoved(server, segment);
+ segmentRemovedLatch.countDown();
+ return res;
+ }
+
+ @Override
+ public CallbackAction segmentViewInitialized()
+ {
+ CallbackAction res = callback.segmentViewInitialized();
+ segmentViewInitLatch.countDown();
+ return res;
+ }
+ }
+ );
+ }
+ };
+
+ baseView.start();
+
+ brokerServerView = new BrokerServerView(
+ EasyMock.createMock(QueryToolChestWarehouse.class),
+ EasyMock.createMock(QueryWatcher.class),
+ getSmileMapper(),
+ EasyMock.createMock(HttpClient.class),
+ baseView,
+ new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
+ );
+ }
+
+ private void setupZNodeForServer(DruidServer server) throws Exception
+ {
+ curator.create()
+ .creatingParentsIfNeeded()
+ .forPath(
+ ZKPaths.makePath(announcementsPath, server.getHost()),
+ jsonMapper.writeValueAsBytes(server.getMetadata())
+ );
+ curator.create()
+ .creatingParentsIfNeeded()
+ .forPath(ZKPaths.makePath(inventoryPath, server.getHost()));
+ }
+
+ private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version)
+ {
+ return DataSegment.builder()
+ .dataSource("test_broker_server_view")
+ .interval(new Interval(intervalStr))
+ .loadSpec(
+ ImmutableMap.<String, Object>of(
+ "type",
+ "local",
+ "path",
+ "somewhere"
+ )
+ )
+ .version(version)
+ .dimensions(ImmutableList.<String>of())
+ .metrics(ImmutableList.<String>of())
+ .shardSpec(new NoneShardSpec())
+ .binaryVersion(9)
+ .size(0)
+ .build();
+ }
+
+ public ObjectMapper getSmileMapper()
+ {
+ final SmileFactory smileFactory = new SmileFactory();
+ smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false);
+ smileFactory.delegateToTextual(true);
+ final ObjectMapper retVal = new DefaultObjectMapper(smileFactory);
+ retVal.getFactory().setCodec(retVal);
+ return retVal;
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ baseView.stop();
+ tearDownServerAndCurator();
+ }
+}