| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.query; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Function; |
| import org.apache.druid.data.input.MapBasedRow; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.query.context.ResponseContext; |
| import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; |
| import org.apache.druid.query.timeboundary.TimeBoundaryQuery; |
| import org.apache.druid.query.timeboundary.TimeBoundaryResultValue; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Interval; |
| import org.joda.time.Period; |
| |
| import java.util.Collections; |
| |
| /** |
| * TimewarpOperator is an example post-processing operator that maps current time |
| * to the latest period ending withing the specified data interval and truncates |
| * the query interval to discard data that would be mapped to the future. |
| */ |
| public class TimewarpOperator<T> implements PostProcessingOperator<T> |
| { |
| private final Interval dataInterval; |
| private final long periodMillis; |
| private final long originMillis; |
| |
| /** |
| * @param dataInterval interval containing the actual data |
| * @param period time will be offset by a multiple of the given period |
| * until there is at least a full period ending within the data interval |
| * @param origin origin to be used to align time periods |
| * (e.g. to determine on what day of the week a weekly period starts) |
| */ |
| @JsonCreator |
| public TimewarpOperator( |
| @JsonProperty("dataInterval") Interval dataInterval, |
| @JsonProperty("period") Period period, |
| @JsonProperty("origin") DateTime origin |
| ) |
| { |
| this.originMillis = origin.getMillis(); |
| this.dataInterval = dataInterval; |
| // this will fail for periods that do not map to millis (e.g. P1M) |
| this.periodMillis = period.toStandardDuration().getMillis(); |
| } |
| |
| @Override |
| public QueryRunner<T> postProcess(QueryRunner<T> baseQueryRunner) |
| { |
| return postProcess(baseQueryRunner, DateTimes.nowUtc().getMillis()); |
| } |
| |
| public QueryRunner<T> postProcess(final QueryRunner<T> baseRunner, final long now) |
| { |
| return new QueryRunner<T>() |
| { |
| @Override |
| public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext) |
| { |
| final DateTimeZone tz = queryPlus.getQuery().getTimezone(); |
| final long offset = computeOffset(now, tz); |
| |
| final Interval interval = queryPlus.getQuery().getIntervals().get(0); |
| final Interval modifiedInterval = new Interval( |
| Math.min(interval.getStartMillis() + offset, now + offset), |
| Math.min(interval.getEndMillis() + offset, now + offset), |
| interval.getChronology() |
| ); |
| return Sequences.map( |
| baseRunner.run( |
| queryPlus.withQuery( |
| queryPlus.getQuery().withQuerySegmentSpec( |
| new MultipleIntervalSegmentSpec(Collections.singletonList(modifiedInterval)) |
| ) |
| ), |
| responseContext |
| ), |
| new Function<T, T>() |
| { |
| @Override |
| public T apply(T input) |
| { |
| if (input instanceof Result) { |
| Result res = (Result) input; |
| Object value = res.getValue(); |
| if (value instanceof TimeBoundaryResultValue) { |
| TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value; |
| |
| DateTime minTime; |
| try { |
| minTime = boundary.getMinTime(); |
| } |
| catch (IllegalArgumentException e) { |
| minTime = null; |
| } |
| |
| final DateTime maxTime = boundary.getMaxTime(); |
| |
| return (T) ((TimeBoundaryQuery) queryPlus.getQuery()).buildResult( |
| DateTimes.utc(Math.min(res.getTimestamp().getMillis() - offset, now)), |
| minTime != null ? minTime.minus(offset) : null, |
| maxTime != null ? DateTimes.utc(Math.min(maxTime.getMillis() - offset, now)) : null |
| ).iterator().next(); |
| } |
| return (T) new Result(res.getTimestamp().minus(offset), value); |
| } else if (input instanceof MapBasedRow) { |
| MapBasedRow row = (MapBasedRow) input; |
| return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent()); |
| } |
| |
| // default to noop for unknown result types |
| return input; |
| } |
| } |
| ); |
| } |
| }; |
| } |
| |
| /** |
| * Map time t into the last `period` ending within `dataInterval` |
| * |
| * @param t the current time to be mapped into `dataInterval` |
| * |
| * @return the offset between the mapped time and time t |
| */ |
| protected long computeOffset(final long t, final DateTimeZone tz) |
| { |
| // start is the beginning of the last period ending within dataInterval |
| long start = dataInterval.getEndMillis() - periodMillis; |
| long startOffset = start % periodMillis - originMillis % periodMillis; |
| if (startOffset < 0) { |
| startOffset += periodMillis; |
| } |
| |
| start -= startOffset; |
| |
| // tOffset is the offset time t within the last period |
| long tOffset = t % periodMillis - originMillis % periodMillis; |
| if (tOffset < 0) { |
| tOffset += periodMillis; |
| } |
| tOffset += start; |
| return tOffset - t - (tz.getOffset(tOffset) - tz.getOffset(t)); |
| } |
| } |