blob: c340544e9084747e1ce4f4b295cbe32ca2087df8 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.logger.Logger;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.selector.QueryableDruidServer;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChestWarehouse;
import com.metamx.http.client.HttpClient;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
*/
public class BrokerServerView implements TimelineServerView
{
private static final Logger log = new Logger(BrokerServerView.class);
private final Object lock = new Object();
private final ConcurrentMap<String, QueryableDruidServer> clients;
private final Map<String, ServerSelector> selectors;
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
private final QueryToolChestWarehouse warehose;
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerView baseView;
public BrokerServerView(
QueryToolChestWarehouse warehose,
ObjectMapper smileMapper,
HttpClient httpClient,
ServerView baseView,
ExecutorService exec
)
{
this.warehose = warehose;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap();
this.timelines = Maps.newHashMap();
baseView.registerSegmentCallback(
exec,
new ServerView.SegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
{
serverAddedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DruidServer server, DataSegment segment)
{
serverRemovedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
}
);
baseView.registerServerCallback(
exec,
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
removeServer(server);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
public void clear()
{
synchronized (lock) {
final Iterator<String> clientsIter = clients.keySet().iterator();
while (clientsIter.hasNext()) {
clientsIter.remove();
}
timelines.clear();
final Iterator<ServerSelector> selectorsIter = selectors.values().iterator();
while (selectorsIter.hasNext()) {
final ServerSelector selector = selectorsIter.next();
selectorsIter.remove();
while (!selector.isEmpty()) {
final QueryableDruidServer pick = selector.pick();
selector.removeServer(pick);
}
}
}
}
private QueryableDruidServer addServer(DruidServer server)
{
QueryableDruidServer exists = clients.put(
server.getName(),
new QueryableDruidServer(server, makeDirectClient(server))
);
if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!?", server);
}
return exists;
}
private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehose, smileMapper, httpClient, server.getHost());
}
private QueryableDruidServer removeServer(DruidServer server)
{
QueryableDruidServer retVal = clients.remove(server.getName());
for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment);
}
return retVal;
}
private void serverAddedSegment(final DruidServer server, final DataSegment segment)
{
String segmentId = segment.getIdentifier();
synchronized (lock) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
ServerSelector selector = selectors.get(segmentId);
if (selector == null) {
selector = new ServerSelector(segment);
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<String, ServerSelector>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline);
}
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
selectors.put(segmentId, selector);
}
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
queryableDruidServer = addServer(server);
}
selector.addServer(queryableDruidServer);
}
}
private void serverRemovedSegment(DruidServer server, DataSegment segment)
{
String segmentId = segment.getIdentifier();
final ServerSelector selector;
synchronized (lock) {
log.debug("Removing segment[%s] from server[%s].", segmentId, server);
selector = selectors.get(segmentId);
if (selector == null) {
log.warn("Told to remove non-existant segment[%s]", segmentId);
return;
}
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (!selector.removeServer(queryableDruidServer)) {
log.warn(
"Asked to disassociate non-existant association between server[%s] and segment[%s]",
server,
segmentId
);
}
if (selector.isEmpty()) {
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
selectors.remove(segmentId);
final PartitionChunk<ServerSelector> removedPartition = timeline.remove(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)
);
if (removedPartition == null) {
log.warn(
"Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist",
segment.getInterval(),
segment.getVersion()
);
}
}
}
}
@Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(String dataSource)
{
synchronized (lock) {
return timelines.get(dataSource);
}
}
@Override
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{
synchronized (lock) {
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
return null;
}
return queryableDruidServer.getClient();
}
}
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
baseView.registerServerCallback(exec, callback);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
baseView.registerSegmentCallback(exec, callback);
}
}