blob: 4258cd263d543933c5c1c4cbb251acb0fe274023 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.dedup;
import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.apex.malhar.lib.util.PojoUtils;
import org.apache.apex.malhar.lib.util.PojoUtils.Getter;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.netlet.util.Slice;
/**
* Time based deduper will de-duplicate incoming POJO tuples and classify them into the following:
* 1. Unique
* 2. Duplicate
* 3. Expired
*
* Since this is de-duplicating in a stream of tuples, and we cannot store all incoming keys indefinitely,
* we use the concept of expiry, where incoming tuples expire after a specified period of time. In this case,
* we choose to expire an entire bucket of data as a unit. This requires the user to specify the bucketing
* structure in advance in order for the operator to function. Here are the parameters for specifying the
* bucketing structure:
* 1. {@link #expireBefore} (in seconds)- This is the total time period during which a tuple stays in the
* system and blocks any other tuple with the same key.
* 2. {@link #bucketSpan} (in seconds) - This is the unit which describes how large a bucket can be.
* Typically this should be defined depending on the use case. For example, if we have {@link #expireBefore}
* set to 1 hour, then typically we would be clubbing data in the order of minutes, so a {@link #bucketSpan} of
* around 1 minute or 5 minutes would make sense. Note that in this case, the entire data worth 1 minute or
* 5 minutes will expire as a whole. Setting it to 1 minute would make the number of time buckets in the system
* to be 1 hour / 1 minute = 60 buckets. Similarly setting {@link #bucketSpan} to 5 minutes would make number
* of buckets to be 12. Note that having too many or too less buckets could have a performance impact. If unsure,
* set the {@link #bucketSpan} to be ~ sqrt({@link #expireBefore}). This way the number of buckets and bucket span
* are balanced.
* 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry.
* Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry
* to be from that instant = r. The start and end of the expiry window periodically move by the span of a single
* bucket. Refer {@link MovingBoundaryTimeBucketAssigner} for details.
*
* Additionally, it also needs the following parameters:
* 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO)
* 2. {@link #timeExpression} - The java expression to extract the time field in the incoming tuple (POJO).
* In case there is no time field in the tuple, system time, when the tuple is processed, will be used.
*
*
* @since 3.5.0
*/
@Evolving
public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements ActivationListener<Context>
{
// Required properties
@NotNull
private String keyExpression;
@NotNull
private String timeExpression;
@NotNull
private long bucketSpan;
@NotNull
private long expireBefore;
// Optional
private long referenceInstant = new Instant().getMillis() / 1000;
private transient Class<?> pojoClass;
private transient Getter<Object, Long> timeGetter;
private transient Getter<Object, Object> keyGetter;
private transient StreamCodec<Object> streamCodec;
public TimeBasedDedupOperator()
{
managedState = new ManagedTimeUnifiedStateImpl();
}
@InputPortFieldAnnotation(schemaRequired = true)
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void setup(PortContext context)
{
pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
streamCodec = getDeduperStreamCodec();
}
@Override
public void process(Object tuple)
{
processTuple(tuple);
}
@Override
public StreamCodec<Object> getStreamCodec()
{
return streamCodec;
}
};
@Override
protected long getTime(Object tuple)
{
return timeGetter.get(tuple);
}
@Override
protected Slice getKey(Object tuple)
{
Object key = keyGetter.get(tuple);
return streamCodec.toByteArray(key);
}
protected StreamCodec<Object> getDeduperStreamCodec()
{
return new DeduperStreamCodec(keyExpression);
}
@Override
public void setup(OperatorContext context)
{
MovingBoundaryTimeBucketAssigner timeBucketAssigner = new MovingBoundaryTimeBucketAssigner();
timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan));
timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore));
timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000));
managedState.setTimeBucketAssigner(timeBucketAssigner);
super.setup(context);
}
@Override
public void activate(Context context)
{
if (timeExpression != null) {
timeGetter = PojoUtils.createGetter(pojoClass, timeExpression, Long.class);
} else {
timeGetter = null;
}
keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class);
}
@Override
public void deactivate()
{
}
@Override
protected Future<Slice> getAsyncManagedState(Object tuple)
{
Future<Slice> valFuture = ((ManagedTimeUnifiedStateImpl)managedState).getAsync(getTime(tuple),
getKey(tuple));
return valFuture;
}
@Override
protected void putManagedState(Object tuple)
{
((ManagedTimeUnifiedStateImpl)managedState).put(getTime(tuple), getKey(tuple), new Slice(null, 0, 0));
}
public String getKeyExpression()
{
return keyExpression;
}
/**
* Sets the key expression
* @param keyExpression
*/
public void setKeyExpression(String keyExpression)
{
this.keyExpression = keyExpression;
}
public String getTimeExpression()
{
return timeExpression;
}
/**
* Sets the time expression
* @param timeExpression
*/
public void setTimeExpression(String timeExpression)
{
this.timeExpression = timeExpression;
}
public long getBucketSpan()
{
return bucketSpan;
}
/**
* Sets the length of a single time bucket (in seconds)
* @param bucketSpan
*/
public void setBucketSpan(long bucketSpan)
{
this.bucketSpan = bucketSpan;
}
public long getExpireBefore()
{
return expireBefore;
}
/**
* Sets the expiry time (in seconds). Any event with time before this is considered to be expired.
* @param expireBefore
*/
public void setExpireBefore(long expireBefore)
{
this.expireBefore = expireBefore;
}
public long getReferenceInstant()
{
return referenceInstant;
}
/**
* Sets the reference instant (in seconds from the epoch).
* By default this is the time when the application is started.
* @param referenceInstant
*/
public void setReferenceInstant(long referenceInstant)
{
this.referenceInstant = referenceInstant;
}
}