Merge pull request #1444 from druid-io/logging-improvement

Separate bootstrap threads from loading threads on historical startup
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
index 1202ab5..6f87db2 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
@@ -51,7 +51,6 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -466,7 +465,6 @@
             config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
             segment,
             descriptorPath,
-            FileContext.getFileContext(descriptorPath.toUri(), context.getConfiguration()),
             context
         );
         for (File file : toMerge) {
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
index 688bd80..22f1f60 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
@@ -23,19 +23,19 @@
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.common.io.OutputSupplier;
+import com.metamx.common.FileUtils;
 import com.metamx.common.IAE;
 import com.metamx.common.ISE;
+import com.metamx.common.RetryUtils;
 import com.metamx.common.logger.Logger;
 import io.druid.segment.ProgressIndicator;
 import io.druid.segment.SegmentUtils;
 import io.druid.timeline.DataSegment;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -45,6 +45,7 @@
 import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.util.Progressable;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.format.ISODateTimeFormat;
 
@@ -56,9 +57,9 @@
 import java.io.OutputStream;
 import java.net.URI;
 import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.ZipEntry;
@@ -75,6 +76,7 @@
 
   private static final int NUM_RETRIES = 8;
   private static final int SECONDS_BETWEEN_RETRIES = 2;
+  private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
 
   public static Path distributedClassPath(String path)
   {
@@ -222,7 +224,6 @@
       throws IOException
   {
     final FileSystem outputFS = FileSystem.get(segmentBasePath.toUri(), configuration);
-    final FileContext fileContext = FileContext.getFileContext(segmentBasePath.toUri(), configuration);
     final Path tmpPath = new Path(segmentBasePath, String.format("index.zip.%d", taskAttemptID.getId()));
     final AtomicLong size = new AtomicLong(0L);
     final DataPusher zipPusher = (DataPusher) RetryProxy.create(
@@ -231,11 +232,11 @@
           @Override
           public long push() throws IOException
           {
-            try (OutputStream outputStream = fileContext.create(
+            try (OutputStream outputStream = outputFS.create(
                 tmpPath,
-                EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE),
-                Options.CreateOpts.createParent(),
-                Options.CreateOpts.bufferSize(256 * 1024)
+                true,
+                DEFAULT_FS_BUFFER_SIZE,
+                progressable
             )) {
               size.set(zipAndCopyDir(mergedBase, outputStream, progressable));
               outputStream.flush();
@@ -284,12 +285,20 @@
         .withLoadSpec(loadSpec)
         .withSize(size.get())
         .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
-    fileContext.rename(tmpPath, finalIndexZipFilePath, Options.Rename.OVERWRITE);
+
+    if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) {
+      throw new IOException(
+          String.format(
+              "Unable to rename [%s] to [%s]",
+              tmpPath.toUri().toString(),
+              finalIndexZipFilePath.toUri().toString()
+          )
+      );
+    }
     writeSegmentDescriptor(
         outputFS,
         finalSegment,
         new Path(segmentBasePath, "descriptor.json"),
-        fileContext,
         progressable
     );
     return finalSegment;
@@ -299,7 +308,6 @@
       final FileSystem outputFS,
       final DataSegment segment,
       final Path descriptorPath,
-      final FileContext fileContext,
       final Progressable progressable
   )
       throws IOException
@@ -313,22 +321,22 @@
             try {
               progressable.progress();
               if (outputFS.exists(descriptorPath)) {
-                if (!fileContext.delete(descriptorPath, false)) {
+                if (!outputFS.delete(descriptorPath, false)) {
                   throw new IOException(String.format("Failed to delete descriptor at [%s]", descriptorPath));
                 }
               }
-              try (final OutputStream descriptorOut = fileContext.create(
+              try (final OutputStream descriptorOut = outputFS.create(
                   descriptorPath,
-                  EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE),
-                  Options.CreateOpts.bufferSize(256 * 1024),
-                  Options.CreateOpts.createParent()
+                  true,
+                  DEFAULT_FS_BUFFER_SIZE,
+                  progressable
               )) {
                 HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
                 descriptorOut.flush();
               }
             }
             catch (RuntimeException | IOException ex) {
-              log.info(ex, "Error in retry loop");
+              log.info(ex, "Exception in descriptor pusher retry loop");
               throw ex;
             }
             return -1;
@@ -433,6 +441,80 @@
     return outputPath;
   }
 
+  /**
+   * Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
+   * which will not overwrite
+   *
+   * @param outputFS              The output fs
+   * @param indexZipFilePath      The original file path
+   * @param finalIndexZipFilePath The to rename the original file to
+   *
+   * @return False if a rename failed, true otherwise (rename success or no rename needed)
+   */
+  private static boolean renameIndexFiles(
+      final FileSystem outputFS,
+      final Path indexZipFilePath,
+      final Path finalIndexZipFilePath
+  )
+  {
+    try {
+      return RetryUtils.retry(
+          new Callable<Boolean>()
+          {
+            @Override
+            public Boolean call() throws Exception
+            {
+              final boolean needRename;
+
+              if (outputFS.exists(finalIndexZipFilePath)) {
+                // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first
+                final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
+                final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
+
+                if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
+                    || zipFile.getLen() != finalIndexZipFile.getLen()) {
+                  log.info(
+                      "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
+                      finalIndexZipFile.getPath(),
+                      new DateTime(finalIndexZipFile.getModificationTime()),
+                      finalIndexZipFile.getLen(),
+                      zipFile.getPath(),
+                      new DateTime(zipFile.getModificationTime()),
+                      zipFile.getLen()
+                  );
+                  outputFS.delete(finalIndexZipFilePath, false);
+                  needRename = true;
+                } else {
+                  log.info(
+                      "File[%s / %s / %sB] existed and will be kept",
+                      finalIndexZipFile.getPath(),
+                      new DateTime(finalIndexZipFile.getModificationTime()),
+                      finalIndexZipFile.getLen()
+                  );
+                  needRename = false;
+                }
+              } else {
+                needRename = true;
+              }
+
+              if (needRename) {
+                log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
+                return outputFS.rename(indexZipFilePath, finalIndexZipFilePath);
+              } else {
+                return true;
+              }
+            }
+          },
+          FileUtils.IS_EXCEPTION,
+          NUM_RETRIES
+      );
+    }
+    catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+
   public static Path prependFSIfNullScheme(FileSystem fs, Path path)
   {
     if (path.toUri().getScheme() == null) {
@@ -455,38 +537,40 @@
           @Override
           public long push() throws IOException
           {
-            final FileContext context = FileContext.getFileContext(zip.toUri(), configuration);
-            long size = 0L;
-            final byte[] buffer = new byte[1 << 13];
-            progressable.progress();
-            try (ZipInputStream in = new ZipInputStream(context.open(zip, 1 << 13))) {
-              for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
-                final String fileName = entry.getName();
-                try (final OutputStream out = new BufferedOutputStream(
-                    new FileOutputStream(
-                        outDir.getAbsolutePath()
-                        + File.separator
-                        + fileName
-                    ), 1 << 13
-                )) {
-                  for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
-                    progressable.progress();
-                    if (len == 0) {
-                      continue;
+            try {
+              final FileSystem fileSystem = zip.getFileSystem(configuration);
+              long size = 0L;
+              final byte[] buffer = new byte[1 << 13];
+              progressable.progress();
+              try (ZipInputStream in = new ZipInputStream(fileSystem.open(zip, 1 << 13))) {
+                for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
+                  final String fileName = entry.getName();
+                  try (final OutputStream out = new BufferedOutputStream(
+                      new FileOutputStream(
+                          outDir.getAbsolutePath()
+                          + File.separator
+                          + fileName
+                      ), 1 << 13
+                  )) {
+                    for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
+                      progressable.progress();
+                      if (len == 0) {
+                        continue;
+                      }
+                      size += len;
+                      out.write(buffer, 0, len);
                     }
-                    size += len;
-                    out.write(buffer, 0, len);
+                    out.flush();
                   }
-                  out.flush();
                 }
               }
+              progressable.progress();
+              return size;
             }
             catch (IOException | RuntimeException exception) {
-              log.error(exception, "Exception in retry loop");
+              log.error(exception, "Exception in unzip retry loop");
               throw exception;
             }
-            progressable.progress();
-            return size;
           }
         },
         RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java
index 884ce1a..91e56ce 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java
@@ -65,6 +65,7 @@
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -451,6 +452,7 @@
   }
 
   @Test
+  @Ignore // This takes a long time due to retries
   public void testHadoopFailure() throws IOException, InterruptedException
   {
     final SQLMetadataSegmentManager manager = new SQLMetadataSegmentManager(
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
index 6b1e2d0..8f4d527 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
@@ -41,6 +41,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
+import java.util.Random;
 
 public class RemoteTaskActionClient implements TaskActionClient
 {
@@ -49,6 +50,7 @@
   private final ServerDiscoverySelector selector;
   private final RetryPolicyFactory retryPolicyFactory;
   private final ObjectMapper jsonMapper;
+  private final Random random = new Random();
 
   private static final Logger log = new Logger(RemoteTaskActionClient.class);
 
@@ -133,7 +135,7 @@
           throw e;
         } else {
           try {
-            final long sleepTime = delay.getMillis();
+            final long sleepTime = jitter(delay.getMillis());
             log.info("Will try again in [%s].", new Duration(sleepTime).toString());
             Thread.sleep(sleepTime);
           }
@@ -145,6 +147,12 @@
     }
   }
 
+  private long jitter(long input){
+    final double jitter = random.nextGaussian() * input / 4.0;
+    long retval = input + (long)jitter;
+    return retval < 0 ? 0 : retval;
+  }
+
   private URI makeServiceUri(final Server instance) throws URISyntaxException
   {
     return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java
new file mode 100644
index 0000000..0a9c8c0
--- /dev/null
+++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.Pair;
+import com.metamx.http.client.HttpClient;
+import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
+import io.druid.client.selector.RandomServerSelectorStrategy;
+import io.druid.client.selector.ServerSelector;
+import io.druid.curator.CuratorTestBase;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.QueryToolChestWarehouse;
+import io.druid.query.QueryWatcher;
+import io.druid.query.TableDataSource;
+import io.druid.server.coordination.DruidServerMetadata;
+import io.druid.server.initialization.ZkPathsConfig;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.TimelineLookup;
+import io.druid.timeline.TimelineObjectHolder;
+import io.druid.timeline.partition.NoneShardSpec;
+import io.druid.timeline.partition.PartitionHolder;
+import io.druid.timeline.partition.SingleElementPartitionChunk;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+public class BrokerServerViewTest extends CuratorTestBase
+{
+  private final ObjectMapper jsonMapper;
+  private final ZkPathsConfig zkPathsConfig;
+  private final String announcementsPath;
+  private final String inventoryPath;
+
+  private CountDownLatch segmentViewInitLatch;
+  private CountDownLatch segmentAddedLatch;
+  private CountDownLatch segmentRemovedLatch;
+
+  private ServerInventoryView baseView;
+  private BrokerServerView brokerServerView;
+
+  public BrokerServerViewTest()
+  {
+    jsonMapper = new DefaultObjectMapper();
+    zkPathsConfig = new ZkPathsConfig();
+    announcementsPath = zkPathsConfig.getAnnouncementsPath();
+    inventoryPath = zkPathsConfig.getLiveSegmentsPath();
+  }
+
+  @Before
+  public void setUp() throws Exception
+  {
+    setupServerAndCurator();
+    curator.start();
+  }
+
+  @Test
+  public void testSingleServerAddedRemovedSegment() throws Exception
+  {
+    segmentViewInitLatch = new CountDownLatch(1);
+    segmentAddedLatch = new CountDownLatch(1);
+    segmentRemovedLatch = new CountDownLatch(1);
+
+    setupViews();
+
+    final DruidServer druidServer = new DruidServer(
+        "localhost:1234",
+        "localhost:1234",
+        10000000L,
+        "historical",
+        "default_tier",
+        0
+    );
+
+    setupZNodeForServer(druidServer);
+
+    final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
+    announceSegmentForServer(druidServer, segment);
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+    TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view"));
+    List<TimelineObjectHolder> serverLookupRes = (List<TimelineObjectHolder>) timeline.lookup(
+        new Interval(
+            "2014-10-20T00:00:00Z/P1D"
+        )
+    );
+    Assert.assertEquals(1, serverLookupRes.size());
+
+    TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = serverLookupRes.get(0);
+    Assert.assertEquals(new Interval("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval());
+    Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion());
+
+    PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject();
+    Assert.assertTrue(actualPartitionHolder.isComplete());
+    Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
+
+    ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
+                                                                                                  .next()).getObject();
+    Assert.assertFalse(selector.isEmpty());
+    Assert.assertEquals(segment, selector.getSegment());
+    Assert.assertEquals(druidServer, selector.pick().getServer());
+
+    unannounceSegmentForServer(druidServer, segment);
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+    Assert.assertEquals(
+        0,
+        ((List<TimelineObjectHolder>) timeline.lookup(new Interval("2014-10-20T00:00:00Z/P1D"))).size()
+    );
+    Assert.assertNull(timeline.findEntry(new Interval("2014-10-20T00:00:00Z/P1D"), "v1"));
+  }
+
+  @Test
+  public void testMultipleServerAddedRemovedSegment() throws Exception
+  {
+    segmentViewInitLatch = new CountDownLatch(1);
+    segmentAddedLatch = new CountDownLatch(5);
+
+    // temporarily set latch count to 1
+    segmentRemovedLatch = new CountDownLatch(1);
+
+    setupViews();
+
+    final List<DruidServer> druidServers = Lists.transform(
+        ImmutableList.<String>of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
+        new Function<String, DruidServer>()
+        {
+          @Override
+          public DruidServer apply(String input)
+          {
+            return new DruidServer(
+                input,
+                input,
+                10000000L,
+                "historical",
+                "default_tier",
+                0
+            );
+          }
+        }
+    );
+
+    for (DruidServer druidServer : druidServers) {
+      setupZNodeForServer(druidServer);
+    }
+
+    final List<DataSegment> segments = Lists.transform(
+        ImmutableList.<Pair<String, String>>of(
+            Pair.of("2011-04-01/2011-04-03", "v1"),
+            Pair.of("2011-04-03/2011-04-06", "v1"),
+            Pair.of("2011-04-01/2011-04-09", "v2"),
+            Pair.of("2011-04-06/2011-04-09", "v3"),
+            Pair.of("2011-04-01/2011-04-02", "v3")
+        ), new Function<Pair<String, String>, DataSegment>()
+        {
+          @Override
+          public DataSegment apply(Pair<String, String> input)
+          {
+            return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
+          }
+        }
+    );
+
+    for (int i = 0; i < 5; ++i) {
+      announceSegmentForServer(druidServers.get(i), segments.get(i));
+    }
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+    TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view"));
+    assertValues(
+        Arrays.asList(
+            createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
+            createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
+            createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
+        ),
+        (List<TimelineObjectHolder>) timeline.lookup(
+            new Interval(
+                "2011-04-01/2011-04-09"
+            )
+        )
+    );
+
+    // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2")
+    unannounceSegmentForServer(druidServers.get(2), segments.get(2));
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+    // renew segmentRemovedLatch since we still have 4 segments to unannounce
+    segmentRemovedLatch = new CountDownLatch(4);
+
+    timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view"));
+    assertValues(
+        Arrays.asList(
+            createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
+            createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)),
+            createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)),
+            createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
+        ),
+        (List<TimelineObjectHolder>) timeline.lookup(
+            new Interval(
+                "2011-04-01/2011-04-09"
+            )
+        )
+    );
+
+    // unannounce all the segments
+    for (int i = 0; i < 5; ++i) {
+      // skip the one that was previously unannounced
+      if (i != 2) {
+        unannounceSegmentForServer(druidServers.get(i), segments.get(i));
+      }
+    }
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+    Assert.assertEquals(
+        0,
+        ((List<TimelineObjectHolder>) timeline.lookup(new Interval("2011-04-01/2011-04-09"))).size()
+    );
+  }
+
+  private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
+  {
+    curator.create()
+           .compressed()
+           .withMode(CreateMode.EPHEMERAL)
+           .forPath(
+               ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
+               jsonMapper.writeValueAsBytes(
+                   ImmutableSet.<DataSegment>of(segment)
+               )
+           );
+  }
+
+  private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
+  {
+    curator.delete().guaranteed().forPath(
+        ZKPaths.makePath(
+            ZKPaths.makePath(inventoryPath, druidServer.getHost()),
+            segment.getIdentifier()
+        )
+    );
+  }
+
+  private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
+      String intervalStr,
+      String version,
+      DruidServer druidServer,
+      DataSegment segment
+  )
+  {
+    return Pair.of(new Interval(intervalStr), Pair.of(version, Pair.of(druidServer, segment)));
+  }
+
+  private void assertValues(
+      List<Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>> expected, List<TimelineObjectHolder> actual
+  )
+  {
+    Assert.assertEquals(expected.size(), actual.size());
+
+    for (int i = 0; i < expected.size(); ++i) {
+      Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
+      TimelineObjectHolder<String, ServerSelector> actualTimelineObjectHolder = actual.get(i);
+
+      Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval());
+      Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion());
+
+      PartitionHolder<ServerSelector> actualPartitionHolder = actualTimelineObjectHolder.getObject();
+      Assert.assertTrue(actualPartitionHolder.isComplete());
+      Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
+
+      ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
+                                                                                                    .next()).getObject();
+      Assert.assertFalse(selector.isEmpty());
+      Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick().getServer());
+      Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment());
+    }
+  }
+
+  private void setupViews() throws Exception
+  {
+    baseView = new BatchServerInventoryView(
+        zkPathsConfig,
+        curator,
+        jsonMapper,
+        Predicates.<DataSegment>alwaysTrue()
+    )
+    {
+      @Override
+      public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
+      {
+        super.registerSegmentCallback(
+            exec, new SegmentCallback()
+            {
+              @Override
+              public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
+              {
+                CallbackAction res = callback.segmentAdded(server, segment);
+                segmentAddedLatch.countDown();
+                return res;
+              }
+
+              @Override
+              public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
+              {
+                CallbackAction res = callback.segmentRemoved(server, segment);
+                segmentRemovedLatch.countDown();
+                return res;
+              }
+
+              @Override
+              public CallbackAction segmentViewInitialized()
+              {
+                CallbackAction res = callback.segmentViewInitialized();
+                segmentViewInitLatch.countDown();
+                return res;
+              }
+            }
+        );
+      }
+    };
+
+    baseView.start();
+
+    brokerServerView = new BrokerServerView(
+        EasyMock.createMock(QueryToolChestWarehouse.class),
+        EasyMock.createMock(QueryWatcher.class),
+        getSmileMapper(),
+        EasyMock.createMock(HttpClient.class),
+        baseView,
+        new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
+    );
+  }
+
+  private void setupZNodeForServer(DruidServer server) throws Exception
+  {
+    curator.create()
+           .creatingParentsIfNeeded()
+           .forPath(
+               ZKPaths.makePath(announcementsPath, server.getHost()),
+               jsonMapper.writeValueAsBytes(server.getMetadata())
+           );
+    curator.create()
+           .creatingParentsIfNeeded()
+           .forPath(ZKPaths.makePath(inventoryPath, server.getHost()));
+  }
+
+  private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version)
+  {
+    return DataSegment.builder()
+                      .dataSource("test_broker_server_view")
+                      .interval(new Interval(intervalStr))
+                      .loadSpec(
+                          ImmutableMap.<String, Object>of(
+                              "type",
+                              "local",
+                              "path",
+                              "somewhere"
+                          )
+                      )
+                      .version(version)
+                      .dimensions(ImmutableList.<String>of())
+                      .metrics(ImmutableList.<String>of())
+                      .shardSpec(new NoneShardSpec())
+                      .binaryVersion(9)
+                      .size(0)
+                      .build();
+  }
+
+  public ObjectMapper getSmileMapper()
+  {
+    final SmileFactory smileFactory = new SmileFactory();
+    smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false);
+    smileFactory.delegateToTextual(true);
+    final ObjectMapper retVal = new DefaultObjectMapper(smileFactory);
+    retVal.getFactory().setCodec(retVal);
+    return retVal;
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    baseView.stop();
+    tearDownServerAndCurator();
+  }
+}