blob: c377b96260726d1a6d29859ee56afd30422d82e2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.datatorrent.contrib.enrich;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.NotNull;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.db.cache.CacheManager;
import com.datatorrent.lib.db.cache.CacheStore;
import com.datatorrent.lib.db.cache.CacheStore.ExpiryType;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
* Base class for Enrichment Operator.  Subclasses should provide implementation to getKey and convert.
* The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
* <p>
* Properties:<br>
* <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
* <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
* <b>store</b>: Specify the type of loader for looking data<br>
* <br>
* @param <INPUT> Type of tuples which are received by this operator</T>
* @param <OUTPUT> Type of tuples which are emitted by this operator</T>
* @displayName Abstract Enrichment Operator
* @tags Enrichment
* @since 3.4.0
public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
* Mandatory parameters for Enricher
protected List<String> lookupFields;
protected List<String> includeFields;
private BackendLoader store;
* Optional parameters for enricher.
private long cacheExpirationInterval = 1 * 60 * 60 * 1000; // 1 hour
private long cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour
private ExpiryType expiryType = ExpiryType.EXPIRE_AFTER_WRITE;
private int cacheSize = 1024; // 1024 records
* Helper variables.
protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
private CacheManager cacheManager = new NullValuesCacheManager();
* This method needs to be called by implementing class for processing a tuple for enrichment.
* The method will take the tuple through following stages:
* <ol>
* <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
* <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
* <li>If not found in cache, it'll do a lookup in configured backend store</li>
* <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
* <li>Finally {@link #emitEnrichedTuple(Object)} is called for emitting the tuple</li>
* </ol>
* @param tuple Input tuple that needs to get processed for enrichment.
protected void enrichTuple(INPUT tuple)
Object key = getKey(tuple);
if (key != null) {
Object result = cacheManager.get(key);
OUTPUT out = convert(tuple, result);
if (out != null) {
* The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
* which forms key part of lookup.
* The order of field values should be same as the one set in {@link #lookupFields} variable.
* @param tuple Input tuple from which fields values for key needs to be fetched.
* @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
protected abstract Object getKey(INPUT tuple);
* The method should be implemented by concrete class.
* This method is expected to take input tuple and an externally fetched object containing fields to be enriched, and
* return an enriched tuple which is ready to be emitted.
* @param in Input tuple which needs to be enriched.
* @param cached ArrayList<Object> containing missing data retrieved from external sources.
* @return Enriched tuple of type OUTPUT
protected abstract OUTPUT convert(INPUT in, Object cached);
* This method should be implemented by concrete class.
* The method is expected to emit tuple of type OUTPUT
* @param tuple Tuple of type OUTPUT that should be emitted.
protected abstract void emitEnrichedTuple(OUTPUT tuple);
* This method should be implemented by concrete method.
* The method should return Class type of field for given fieldName from output tuple.
* @param fieldName Field name for which field type needs to be identified
* @return Class type for given field.
protected abstract Class<?> getIncludeFieldType(String fieldName);
* This method should be implemented by concrete method.
* The method should return Class type of field for given fieldName from input tuple.
* @param fieldName Field name for which field type needs to be identified
* @return Class type for given field.
protected abstract Class<?> getLookupFieldType(String fieldName);
public void setup(Context.OperatorContext context)
CacheStore primaryCache = new CacheStore();
// set expiration to one day.
public void activate(Context context)
for (String s : lookupFields) {
lookupFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getLookupFieldType(s))));
if (includeFields != null) {
for (String s : includeFields) {
includeFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getIncludeFieldType(s))));
store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
try {
} catch (IOException e) {
throw new RuntimeException("Unable to initialize cache manager", e);
public void deactivate()
try {
} catch (IOException e) {
throw new RuntimeException("Unable to close cache manager", e);
* Returns a list of fields which are used for lookup.
* @return List of fields
public List<String> getLookupFields()
return lookupFields;
* Set fields on which lookup needs to happen in external store.
* This is a mandatory parameter.
* @param lookupFields List of fields on which lookup happens.
public void setLookupFields(List<String> lookupFields)
this.lookupFields = lookupFields;
* Returns a list of fields using which tuple is enriched
* @return List of fields.
public List<String> getIncludeFields()
return includeFields;
* Sets list of fields to be fetched from external store for enriching the tuple.
* This is a mandatory parameter.
* @param includeFields List of fields.
public void setIncludeFields(List<String> includeFields)
this.includeFields = includeFields;
* Returns the backend store which will enrich the tuple.
* @return Object of type {@link BackendLoader}
public BackendLoader getStore()
return store;
* Sets backend store which will enrich the tuple.
* This is a mandatory parameter.
* @param store Object of type {@link BackendLoader}
public void setStore(BackendLoader store)
{ = store;
* Returns cache entry expiration interval in ms.
* This is an optional parameter.
* @return Cache entry expiration interval in ms
public long getCacheExpirationInterval()
return cacheExpirationInterval;
* Sets cache entry expiration interval in ms.
* This is an optional parameter.
* @param cacheExpirationInterval Cache entry expiration interval in ms
public void setCacheExpirationInterval(long cacheExpirationInterval)
this.cacheExpirationInterval = cacheExpirationInterval;
* Returns cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
* This is an optional parameter.
* @return cache cleanup interval in ms.
public long getCacheCleanupInterval()
return cacheCleanupInterval;
* Set Cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
* This is an optional parameter.
* @param cacheCleanupInterval cache cleanup interval in ms.
public void setCacheCleanupInterval(long cacheCleanupInterval)
this.cacheCleanupInterval = cacheCleanupInterval;
* Get size (number of entries) of cache.
* @return Number of entries allowed in cache.
public int getCacheSize()
return cacheSize;
* Set size (number of entries) of cache.
* @param cacheSize Number of entries allowed in cache.
public void setCacheSize(int cacheSize)
this.cacheSize = cacheSize;
public ExpiryType getExpiryType()
return expiryType;
public void setExpiryType(ExpiryType expiryType)
this.expiryType = expiryType;
public CacheManager getCacheManager()
return cacheManager;
public void setCacheManager(CacheManager cacheManager)
this.cacheManager = cacheManager;