blob: a61f46f58af651cce6b2ab56041c8a85c37e67d4 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class ServerInventoryView implements ServerView, InventoryView
{
private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
private final CuratorInventoryManager<DruidServer, DataSegment> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
private static final Map<String, Integer> removedSegments = new MapMaker().makeMap();
public ServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
inventoryManager = new CuratorInventoryManager<DruidServer, DataSegment>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
exec,
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public byte[] serializeContainer(DruidServer container)
{
try {
return jsonMapper.writeValueAsBytes(container);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override
public DataSegment deserializeInventory(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DataSegment.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public byte[] serializeInventory(DataSegment inventory)
{
try {
return jsonMapper.writeValueAsBytes(inventory);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
}
@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerCallbacks(deadContainer);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
return newContainer.addDataSegments(oldContainer);
}
@Override
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventory);
final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container, inventory);
}
}
);
return retVal;
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
final DruidServer retVal = container.removeDataSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running callbacks or cleanup for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return retVal;
}
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container, segment);
}
}
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
return retVal;
}
}
);
}
public int lookupSegmentLifetime(DataSegment segment)
{
Integer lifetime = removedSegments.get(segment.getIdentifier());
return (lifetime == null) ? 0 : lifetime;
}
public void decrementRemovedSegmentsLifetime()
{
for (Iterator<Map.Entry<String, Integer>> mapIter = removedSegments.entrySet().iterator(); mapIter.hasNext(); ) {
Map.Entry<String, Integer> segment = mapIter.next();
int lifetime = segment.getValue() - 1;
if (lifetime < 0) {
mapIter.remove();
} else {
segment.setValue(lifetime);
}
}
}
@LifecycleStart
public void start() throws Exception
{
synchronized (started) {
if (!started.get()) {
inventoryManager.start();
started.set(true);
}
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (started) {
if (started.getAndSet(false)) {
inventoryManager.stop();
}
}
}
public boolean isStarted()
{
return started.get();
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
return inventoryManager.getInventoryValue(containerKey);
}
@Override
public Iterable<DruidServer> getInventory()
{
return inventoryManager.getInventory();
}
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverCallbacks.put(callback, exec);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
segmentCallbacks.put(callback, exec);
}
private void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbacks.remove(entry.getKey());
}
}
}
);
}
}
private void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverCallbacks.remove(entry.getKey());
}
}
}
);
}
}
}