more logging for curator events
diff --git a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java
index ccb2846..c340544 100644
--- a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java
+++ b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java
@@ -127,7 +127,7 @@
     }
   }
 
-  private void addServer(DruidServer server)
+  private QueryableDruidServer addServer(DruidServer server)
   {
     QueryableDruidServer exists = clients.put(
         server.getName(),
@@ -136,6 +136,8 @@
     if (exists != null) {
       log.warn("QueryRunner for server[%s] already existed!?", server);
     }
+
+    return exists;
   }
 
   private DirectDruidClient makeDirectClient(DruidServer server)
@@ -143,12 +145,13 @@
     return new DirectDruidClient(warehose, smileMapper, httpClient, server.getHost());
   }
 
-  private void removeServer(DruidServer server)
+  private QueryableDruidServer removeServer(DruidServer server)
   {
-    clients.remove(server.getName());
+    QueryableDruidServer retVal = clients.remove(server.getName());
     for (DataSegment segment : server.getSegments().values()) {
       serverRemovedSegment(server, segment);
     }
+    return retVal;
   }
 
   private void serverAddedSegment(final DruidServer server, final DataSegment segment)
@@ -171,10 +174,11 @@
         selectors.put(segmentId, selector);
       }
 
-      if (!clients.containsKey(server.getName())) {
-        addServer(server);
+      QueryableDruidServer queryableDruidServer = clients.get(server.getName());
+      if (queryableDruidServer == null) {
+        queryableDruidServer = addServer(server);
       }
-      selector.addServer(clients.get(server.getName()));
+      selector.addServer(queryableDruidServer);
     }
   }
 
@@ -236,6 +240,7 @@
       QueryableDruidServer queryableDruidServer = clients.get(server.getName());
       if (queryableDruidServer == null) {
         log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
+        return null;
       }
       return queryableDruidServer.getClient();
     }
diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
index 4c6b302..a61f46f 100644
--- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
+++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
@@ -172,6 +172,16 @@
             final DataSegment segment = container.getSegment(inventoryKey);
             final DruidServer retVal = container.removeDataSegment(inventoryKey);
 
+            if (segment == null) {
+              log.warn(
+                  "Not running callbacks or cleanup for non-existing segment[%s] on server[%s]",
+                  inventoryKey,
+                  container.getName()
+              );
+
+              return retVal;
+            }
+
             runSegmentCallbacks(
                 new Function<SegmentCallback, CallbackAction>()
                 {
diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
index c1cb16d..f308554 100644
--- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
+++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
@@ -44,10 +44,10 @@
 /**
  * An InventoryManager watches updates to inventory on Zookeeper (or some other discovery-like service publishing
  * system).  It is built up on two object types: containers and inventory objects.
- *
+ * <p/>
  * The logic of the InventoryManager just maintains a local cache of the containers and inventory it sees on ZK.  It
  * provides methods for getting at the container objects, which house the actual individual pieces of inventory.
- *
+ * <p/>
  * A Strategy is provided to the constructor of an Inventory manager, this strategy provides all of the
  * object-specific logic to serialize, deserialize, compose and alter the container and inventory objects.
  */
@@ -128,8 +128,7 @@
       final ContainerHolder containerHolder = containers.remove(containerKey);
       if (containerHolder == null) {
         log.wtf("!?  Got key[%s] from keySet() but it didn't have a value!?", containerKey);
-      }
-      else {
+      } else {
         // This close() call actually calls shutdownNow() on the executor registered with the Cache object...
         containerHolder.getCache().close();
       }
@@ -202,52 +201,58 @@
 
       switch (event.getType()) {
         case CHILD_ADDED:
-          container = strategy.deserializeContainer(child.getData());
+          synchronized (lock) {
+            container = strategy.deserializeContainer(child.getData());
 
-          // This would normally be a race condition, but the only thing that should be mutating the containers
-          // map is this listener, which should never run concurrently.  If the same container is going to disappear
-          // and come back, we expect a removed event in between.
-          if (containers.containsKey(containerKey)) {
-            log.error("New node[%s] but there was already one.  That's not good, ignoring new one.", child.getPath());
-          }
+            // This would normally be a race condition, but the only thing that should be mutating the containers
+            // map is this listener, which should never run concurrently.  If the same container is going to disappear
+            // and come back, we expect a removed event in between.
+            if (containers.containsKey(containerKey)) {
+              log.error("New node[%s] but there was already one.  That's not good, ignoring new one.", child.getPath());
+            } else {
+              final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
+              PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
+              inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
 
-          final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
-          PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
-          inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
+              containers.put(containerKey, new ContainerHolder(container, inventoryCache));
 
-          containers.put(containerKey, new ContainerHolder(container, inventoryCache));
+              log.info("Starting inventory cache for %s", container);
+              inventoryCache.start();
+              strategy.newContainer(container);
+            }
 
-          inventoryCache.start();
-          strategy.newContainer(container);
-
-          break;
-        case CHILD_REMOVED:
-          final ContainerHolder removed = containers.remove(containerKey);
-          if (removed == null) {
-            log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
             break;
           }
-
-          // This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
-          // better have its own executor or ignore shutdownNow() calls...
-          removed.getCache().close();
-          strategy.deadContainer(removed.getContainer());
-
-          break;
-        case CHILD_UPDATED:
-          container = strategy.deserializeContainer(child.getData());
-
-          ContainerHolder oldContainer = containers.get(containerKey);
-          if (oldContainer == null) {
-            log.warn("Container update[%s], but the old container didn't exist!?  Ignoring.", child.getPath());
-          }
-          else {
-            synchronized (oldContainer) {
-              oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
+        case CHILD_REMOVED:
+          synchronized (lock) {
+            final ContainerHolder removed = containers.remove(containerKey);
+            if (removed == null) {
+              log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
+              break;
             }
-          }
 
-          break;
+            // This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
+            // better have its own executor or ignore shutdownNow() calls...
+            removed.getCache().close();
+            strategy.deadContainer(removed.getContainer());
+
+            break;
+          }
+        case CHILD_UPDATED:
+          synchronized (lock) {
+            container = strategy.deserializeContainer(child.getData());
+
+            ContainerHolder oldContainer = containers.get(containerKey);
+            if (oldContainer == null) {
+              log.warn("Container update[%s], but the old container didn't exist!?  Ignoring.", child.getPath());
+            } else {
+              synchronized (oldContainer) {
+                oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
+              }
+            }
+
+            break;
+          }
       }
     }