blob: 9d117695321a536ee208d59d921d9474f826a293 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.collect.CountingMap;
import com.metamx.druid.index.Segment;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.partition.PartitionHolder;
import com.metamx.druid.query.BySegmentQueryRunner;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.MetricsEmittingQueryRunner;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.query.segment.SpecificSegmentQueryRunner;
import com.metamx.druid.query.segment.SpecificSegmentSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
*/
public class ServerManager implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
private final Object lock = new Object();
private final SegmentLoader segmentLoader;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ServiceEmitter emitter;
private final ExecutorService exec;
private final Map<String, VersionedIntervalTimeline<String, Segment>> dataSources;
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
public ServerManager(
SegmentLoader segmentLoader,
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
ExecutorService exec
)
{
this.segmentLoader = segmentLoader;
this.conglomerate = conglomerate;
this.emitter = emitter;
this.exec = exec;
this.dataSources = new HashMap<String, VersionedIntervalTimeline<String, Segment>>();
}
public Map<String, Long> getDataSourceSizes()
{
synchronized (dataSourceSizes) {
return dataSourceSizes.snapshot();
}
}
public Map<String, Long> getDataSourceCounts()
{
synchronized (dataSourceCounts) {
return dataSourceCounts.snapshot();
}
}
public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException
{
return segmentLoader.isSegmentLoaded(segment);
}
public void loadSegment(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
}
catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
}
catch (SegmentLoadingException e1) {
// ignore
}
throw e;
}
if (adapter == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
}
synchronized (lock) {
String dataSource = segment.getDataSource();
VersionedIntervalTimeline<String, Segment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
loadedIntervals = new VersionedIntervalTimeline<String, Segment>(Ordering.natural());
dataSources.put(dataSource, loadedIntervals);
}
PartitionHolder<Segment> entry = loadedIntervals.findEntry(
segment.getInterval(),
segment.getVersion()
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier());
}
loadedIntervals.add(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(adapter)
);
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L);
}
}
}
public void dropSegment(final DataSegment segment) throws SegmentLoadingException
{
String dataSource = segment.getDataSource();
synchronized (lock) {
VersionedIntervalTimeline<String, Segment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource);
return;
}
PartitionChunk<Segment> removed = loadedIntervals.remove(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk((Segment) null)
);
Segment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, -segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L);
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
dataSource,
segment.getInterval(),
segment.getVersion()
);
}
}
segmentLoader.cleanup(segment);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final VersionedIntervalTimeline<String, Segment> timeline = dataSources.get(query.getDataSource());
if (timeline == null) {
return new NoopQueryRunner<T>();
}
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, Segment>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, Segment>> apply(Interval input)
{
return timeline.lookup(input);
}
}
)
.transformCat(
new Function<TimelineObjectHolder<String, Segment>, Iterable<QueryRunner<T>>>()
{
@Override
public Iterable<QueryRunner<T>> apply(@Nullable final TimelineObjectHolder<String, Segment> holder)
{
if (holder == null) {
return null;
}
return FunctionalIterable
.create(holder.getObject())
.transform(
new Function<PartitionChunk<Segment>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(PartitionChunk<Segment> input)
{
return buildAndDecorateQueryRunner(
factory,
toolChest,
input.getObject(),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
)
)
);
}
}
)
.filter(Predicates.<QueryRunner<T>>notNull());
}
}
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
log.makeAlert("Unknown query type, [%s]", query.getClass())
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<T>();
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final VersionedIntervalTimeline<String, Segment> timeline = dataSources.get(query.getDataSource());
if (timeline == null) {
return new NoopQueryRunner<T>();
}
FunctionalIterable<QueryRunner<T>> adapters = FunctionalIterable
.create(specs)
.transformCat(
new Function<SegmentDescriptor, Iterable<QueryRunner<T>>>()
{
@Override
@SuppressWarnings("unchecked")
public Iterable<QueryRunner<T>> apply(@Nullable SegmentDescriptor input)
{
final PartitionHolder<Segment> entry = timeline.findEntry(
input.getInterval(), input.getVersion()
);
if (entry == null) {
return null;
}
final PartitionChunk<Segment> chunk = entry.getChunk(input.getPartitionNumber());
if (chunk == null) {
return null;
}
final Segment adapter = chunk.getObject();
return Arrays.asList(
buildAndDecorateQueryRunner(factory, toolChest, adapter, new SpecificSegmentSpec(input))
);
}
}
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);
}
private <T> QueryRunner<T> buildAndDecorateQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
Segment adapter,
QuerySegmentSpec segmentSpec
)
{
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
)
).withWaitMeasuredFromNow(),
segmentSpec
);
}
}