more logging for curator events
diff --git a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java
index ccb2846..c340544 100644
--- a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java
+++ b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java
@@ -127,7 +127,7 @@
}
}
- private void addServer(DruidServer server)
+ private QueryableDruidServer addServer(DruidServer server)
{
QueryableDruidServer exists = clients.put(
server.getName(),
@@ -136,6 +136,8 @@
if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!?", server);
}
+
+ return exists;
}
private DirectDruidClient makeDirectClient(DruidServer server)
@@ -143,12 +145,13 @@
return new DirectDruidClient(warehose, smileMapper, httpClient, server.getHost());
}
- private void removeServer(DruidServer server)
+ private QueryableDruidServer removeServer(DruidServer server)
{
- clients.remove(server.getName());
+ QueryableDruidServer retVal = clients.remove(server.getName());
for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment);
}
+ return retVal;
}
private void serverAddedSegment(final DruidServer server, final DataSegment segment)
@@ -171,10 +174,11 @@
selectors.put(segmentId, selector);
}
- if (!clients.containsKey(server.getName())) {
- addServer(server);
+ QueryableDruidServer queryableDruidServer = clients.get(server.getName());
+ if (queryableDruidServer == null) {
+ queryableDruidServer = addServer(server);
}
- selector.addServer(clients.get(server.getName()));
+ selector.addServer(queryableDruidServer);
}
}
@@ -236,6 +240,7 @@
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
+ return null;
}
return queryableDruidServer.getClient();
}
diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
index 4c6b302..a61f46f 100644
--- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
+++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
@@ -172,6 +172,16 @@
final DataSegment segment = container.getSegment(inventoryKey);
final DruidServer retVal = container.removeDataSegment(inventoryKey);
+ if (segment == null) {
+ log.warn(
+ "Not running callbacks or cleanup for non-existing segment[%s] on server[%s]",
+ inventoryKey,
+ container.getName()
+ );
+
+ return retVal;
+ }
+
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
index c1cb16d..f308554 100644
--- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
+++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
@@ -44,10 +44,10 @@
/**
* An InventoryManager watches updates to inventory on Zookeeper (or some other discovery-like service publishing
* system). It is built up on two object types: containers and inventory objects.
- *
+ * <p/>
* The logic of the InventoryManager just maintains a local cache of the containers and inventory it sees on ZK. It
* provides methods for getting at the container objects, which house the actual individual pieces of inventory.
- *
+ * <p/>
* A Strategy is provided to the constructor of an Inventory manager, this strategy provides all of the
* object-specific logic to serialize, deserialize, compose and alter the container and inventory objects.
*/
@@ -128,8 +128,7 @@
final ContainerHolder containerHolder = containers.remove(containerKey);
if (containerHolder == null) {
log.wtf("!? Got key[%s] from keySet() but it didn't have a value!?", containerKey);
- }
- else {
+ } else {
// This close() call actually calls shutdownNow() on the executor registered with the Cache object...
containerHolder.getCache().close();
}
@@ -202,52 +201,58 @@
switch (event.getType()) {
case CHILD_ADDED:
- container = strategy.deserializeContainer(child.getData());
+ synchronized (lock) {
+ container = strategy.deserializeContainer(child.getData());
- // This would normally be a race condition, but the only thing that should be mutating the containers
- // map is this listener, which should never run concurrently. If the same container is going to disappear
- // and come back, we expect a removed event in between.
- if (containers.containsKey(containerKey)) {
- log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
- }
+ // This would normally be a race condition, but the only thing that should be mutating the containers
+ // map is this listener, which should never run concurrently. If the same container is going to disappear
+ // and come back, we expect a removed event in between.
+ if (containers.containsKey(containerKey)) {
+ log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
+ } else {
+ final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
+ PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
+ inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
- final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
- PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
- inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
+ containers.put(containerKey, new ContainerHolder(container, inventoryCache));
- containers.put(containerKey, new ContainerHolder(container, inventoryCache));
+ log.info("Starting inventory cache for %s", container);
+ inventoryCache.start();
+ strategy.newContainer(container);
+ }
- inventoryCache.start();
- strategy.newContainer(container);
-
- break;
- case CHILD_REMOVED:
- final ContainerHolder removed = containers.remove(containerKey);
- if (removed == null) {
- log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
break;
}
-
- // This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
- // better have its own executor or ignore shutdownNow() calls...
- removed.getCache().close();
- strategy.deadContainer(removed.getContainer());
-
- break;
- case CHILD_UPDATED:
- container = strategy.deserializeContainer(child.getData());
-
- ContainerHolder oldContainer = containers.get(containerKey);
- if (oldContainer == null) {
- log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
- }
- else {
- synchronized (oldContainer) {
- oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
+ case CHILD_REMOVED:
+ synchronized (lock) {
+ final ContainerHolder removed = containers.remove(containerKey);
+ if (removed == null) {
+ log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
+ break;
}
- }
- break;
+ // This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
+ // better have its own executor or ignore shutdownNow() calls...
+ removed.getCache().close();
+ strategy.deadContainer(removed.getContainer());
+
+ break;
+ }
+ case CHILD_UPDATED:
+ synchronized (lock) {
+ container = strategy.deserializeContainer(child.getData());
+
+ ContainerHolder oldContainer = containers.get(containerKey);
+ if (oldContainer == null) {
+ log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
+ } else {
+ synchronized (oldContainer) {
+ oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
+ }
+ }
+
+ break;
+ }
}
}