blob: 088ec0b6eb8cb69eeb6d756acfe87a2ddeaa6430 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.timeboundary;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
public class TimeBoundaryQueryQueryToolChest
extends QueryToolChest<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
{
private static final byte TIMEBOUNDARY_QUERY = 0x3;
private static final TypeReference<Result<TimeBoundaryResultValue>> TYPE_REFERENCE = new TypeReference<Result<TimeBoundaryResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
@Override
public <T extends LogicalSegment> List<T> filterSegments(TimeBoundaryQuery query, List<T> segments)
{
if (segments.size() <= 1) {
return segments;
}
final T min = segments.get(0);
final T max = segments.get(segments.size() - 1);
return Lists.newArrayList(
Iterables.filter(
segments,
new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return (min != null && input.getInterval().overlaps(min.getInterval())) ||
(max != null && input.getInterval().overlaps(max.getInterval()));
}
}
)
);
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> mergeResults(
final QueryRunner<Result<TimeBoundaryResultValue>> runner
)
{
return new BySegmentSkippingQueryRunner<Result<TimeBoundaryResultValue>>(runner)
{
@Override
protected Sequence<Result<TimeBoundaryResultValue>> doRun(
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner, Query<Result<TimeBoundaryResultValue>> input
)
{
TimeBoundaryQuery query = (TimeBoundaryQuery) input;
return Sequences.simple(
query.mergeResults(
Sequences.toList(baseRunner.run(query), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
)
);
}
};
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser6("false");
}
@Override
public Function<Result<TimeBoundaryResultValue>, Result<TimeBoundaryResultValue>> makePreComputeManipulatorFn(
TimeBoundaryQuery query, MetricManipulationFn fn
)
{
return Functions.identity();
}
@Override
public TypeReference<Result<TimeBoundaryResultValue>> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery> getCacheStrategy(TimeBoundaryQuery query)
{
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
final byte[] cacheKey = query.getCacheKey();
return ByteBuffer.allocate(1 + cacheKey.length)
.put(TIMEBOUNDARY_QUERY)
.put(cacheKey)
.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<TimeBoundaryResultValue>, Object> prepareForCache()
{
return new Function<Result<TimeBoundaryResultValue>, Object>()
{
@Override
public Object apply(Result<TimeBoundaryResultValue> input)
{
return Lists.newArrayList(input.getTimestamp().getMillis(), input.getValue());
}
};
}
@Override
public Function<Object, Result<TimeBoundaryResultValue>> pullFromCache()
{
return new Function<Object, Result<TimeBoundaryResultValue>>()
{
@Override
@SuppressWarnings("unchecked")
public Result<TimeBoundaryResultValue> apply(Object input)
{
List<Object> result = (List<Object>) input;
return new Result<>(
new DateTime(result.get(0)),
new TimeBoundaryResultValue(result.get(1))
);
}
};
}
@Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
};
}
public Ordering<Result<TimeBoundaryResultValue>> getOrdering()
{
return Ordering.natural();
}
}