blob: 509c4a5c7931f55e8a56bdefdfbe05d02dc0d57a [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/**
*/
@ManageLifecycle
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>> implements FilteredServerView
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
final private ConcurrentMap<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
final private Predicate<DataSegment> defaultFilter;
@Inject
public BatchServerInventoryView(
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final Predicate<DataSegment> defaultFilter
)
{
super(
log,
zkPaths.getAnnouncementsPath(),
zkPaths.getLiveSegmentsPath(),
curator,
jsonMapper,
new TypeReference<Set<DataSegment>>(){}
);
Preconditions.checkNotNull(defaultFilter);
this.defaultFilter = defaultFilter;
}
@Override
protected DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final Set<DataSegment> inventory
)
{
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
zNodes.put(inventoryKey, filteredInventory);
for (DataSegment segment : filteredInventory) {
addSingleInventory(container, segment);
}
return container;
}
@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, Set<DataSegment> inventory
)
{
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
}
for (DataSegment segment : Sets.difference(filteredInventory, existing)) {
addSingleInventory(container, segment);
}
for (DataSegment segment : Sets.difference(existing, filteredInventory)) {
removeSingleInventory(container, segment.getIdentifier());
}
zNodes.put(inventoryKey, filteredInventory);
return container;
}
@Override
protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed container[%s]", container.getName(), inventoryKey);
Set<DataSegment> segments = zNodes.remove(inventoryKey);
if (segments == null) {
log.warn("Told to remove container[%s], which didn't exist", inventoryKey);
return container;
}
for (DataSegment segment : segments) {
removeSingleInventory(container, segment.getIdentifier());
}
return container;
}
@Override
public void registerSegmentCallback(
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
)
{
segmentPredicates.put(callback, filter);
registerSegmentCallback(
exec, new SegmentCallback()
{
@Override
public CallbackAction segmentAdded(
DruidServerMetadata server, DataSegment segment
)
{
final CallbackAction action;
if(filter.apply(segment)) {
action = callback.segmentAdded(server, segment);
if (action.equals(CallbackAction.UNREGISTER)) {
segmentPredicates.remove(callback);
}
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentRemoved(
DruidServerMetadata server, DataSegment segment
)
{
final CallbackAction action;
if(filter.apply(segment)) {
action = callback.segmentRemoved(server, segment);
if (action.equals(CallbackAction.UNREGISTER)) {
segmentPredicates.remove(callback);
}
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
}
);
}
}