blob: c377b96260726d1a6d29859ee56afd30422d82e2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.enrich;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.NotNull;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.db.cache.CacheManager;
import com.datatorrent.lib.db.cache.CacheStore;
import com.datatorrent.lib.db.cache.CacheStore.ExpiryType;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
/**
* Base class for Enrichment Operator.  Subclasses should provide implementation to getKey and convert.
* The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
* <p>
* Properties:<br>
* <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
* <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
* <b>store</b>: Specify the type of loader for looking data<br>
* <br>
*
* @param <INPUT> Type of tuples which are received by this operator</T>
* @param <OUTPUT> Type of tuples which are emitted by this operator</T>
* @displayName Abstract Enrichment Operator
* @tags Enrichment
*
* @since 3.4.0
*/
@InterfaceStability.Evolving
public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener
{
/**
* Mandatory parameters for Enricher
*/
@NotNull
protected List<String> lookupFields;
@NotNull
protected List<String> includeFields;
@NotNull
private BackendLoader store;
/**
* Optional parameters for enricher.
*/
private long cacheExpirationInterval = 1 * 60 * 60 * 1000; // 1 hour
private long cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour
private ExpiryType expiryType = ExpiryType.EXPIRE_AFTER_WRITE;
private int cacheSize = 1024; // 1024 records
/**
* Helper variables.
*/
protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
private CacheManager cacheManager = new NullValuesCacheManager();
/**
* This method needs to be called by implementing class for processing a tuple for enrichment.
* The method will take the tuple through following stages:
* <ol>
* <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li>
* <li>Using key fields call cache manager to retrieve for any key that is cached already</li>
* <li>If not found in cache, it'll do a lookup in configured backend store</li>
* <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li>
* <li>Finally {@link #emitEnrichedTuple(Object)} is called for emitting the tuple</li>
* </ol>
*
* @param tuple Input tuple that needs to get processed for enrichment.
*/
protected void enrichTuple(INPUT tuple)
{
Object key = getKey(tuple);
if (key != null) {
Object result = cacheManager.get(key);
OUTPUT out = convert(tuple, result);
if (out != null) {
emitEnrichedTuple(out);
}
}
}
/**
* The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields
* which forms key part of lookup.
* The order of field values should be same as the one set in {@link #lookupFields} variable.
*
* @param tuple Input tuple from which fields values for key needs to be fetched.
* @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields}
*/
protected abstract Object getKey(INPUT tuple);
/**
* The method should be implemented by concrete class.
* This method is expected to take input tuple and an externally fetched object containing fields to be enriched, and
* return an enriched tuple which is ready to be emitted.
*
* @param in Input tuple which needs to be enriched.
* @param cached ArrayList<Object> containing missing data retrieved from external sources.
* @return Enriched tuple of type OUTPUT
*/
protected abstract OUTPUT convert(INPUT in, Object cached);
/**
* This method should be implemented by concrete class.
* The method is expected to emit tuple of type OUTPUT
*
* @param tuple Tuple of type OUTPUT that should be emitted.
*/
protected abstract void emitEnrichedTuple(OUTPUT tuple);
/**
* This method should be implemented by concrete method.
* The method should return Class type of field for given fieldName from output tuple.
*
* @param fieldName Field name for which field type needs to be identified
* @return Class type for given field.
*/
protected abstract Class<?> getIncludeFieldType(String fieldName);
/**
* This method should be implemented by concrete method.
* The method should return Class type of field for given fieldName from input tuple.
*
* @param fieldName Field name for which field type needs to be identified
* @return Class type for given field.
*/
protected abstract Class<?> getLookupFieldType(String fieldName);
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
CacheStore primaryCache = new CacheStore();
// set expiration to one day.
primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
primaryCache.setEntryExpiryStrategy(expiryType);
primaryCache.setMaxCacheSize(cacheSize);
cacheManager.setPrimary(primaryCache);
cacheManager.setBackup(store);
}
@Override
public void activate(Context context)
{
for (String s : lookupFields) {
lookupFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getLookupFieldType(s))));
}
if (includeFields != null) {
for (String s : includeFields) {
includeFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getIncludeFieldType(s))));
}
}
store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
try {
cacheManager.initialize();
} catch (IOException e) {
throw new RuntimeException("Unable to initialize cache manager", e);
}
}
@Override
public void deactivate()
{
try {
cacheManager.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close cache manager", e);
}
}
/**
* Returns a list of fields which are used for lookup.
*
* @return List of fields
*/
public List<String> getLookupFields()
{
return lookupFields;
}
/**
* Set fields on which lookup needs to happen in external store.
* This is a mandatory parameter.
*
* @param lookupFields List of fields on which lookup happens.
*/
public void setLookupFields(List<String> lookupFields)
{
this.lookupFields = lookupFields;
}
/**
* Returns a list of fields using which tuple is enriched
*
* @return List of fields.
*/
public List<String> getIncludeFields()
{
return includeFields;
}
/**
* Sets list of fields to be fetched from external store for enriching the tuple.
* This is a mandatory parameter.
*
* @param includeFields List of fields.
*/
public void setIncludeFields(List<String> includeFields)
{
this.includeFields = includeFields;
}
/**
* Returns the backend store which will enrich the tuple.
*
* @return Object of type {@link BackendLoader}
*/
public BackendLoader getStore()
{
return store;
}
/**
* Sets backend store which will enrich the tuple.
* This is a mandatory parameter.
*
* @param store Object of type {@link BackendLoader}
*/
public void setStore(BackendLoader store)
{
this.store = store;
}
/**
* Returns cache entry expiration interval in ms.
* This is an optional parameter.
*
* @return Cache entry expiration interval in ms
*/
public long getCacheExpirationInterval()
{
return cacheExpirationInterval;
}
/**
* Sets cache entry expiration interval in ms.
* This is an optional parameter.
*
* @param cacheExpirationInterval Cache entry expiration interval in ms
*/
public void setCacheExpirationInterval(long cacheExpirationInterval)
{
this.cacheExpirationInterval = cacheExpirationInterval;
}
/**
* Returns cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
* This is an optional parameter.
*
* @return cache cleanup interval in ms.
*/
public long getCacheCleanupInterval()
{
return cacheCleanupInterval;
}
/**
* Set Cache cleanup interval in ms. After this interval, cache cleanup operation will be performed.
* This is an optional parameter.
*
* @param cacheCleanupInterval cache cleanup interval in ms.
*/
public void setCacheCleanupInterval(long cacheCleanupInterval)
{
this.cacheCleanupInterval = cacheCleanupInterval;
}
/**
* Get size (number of entries) of cache.
*
* @return Number of entries allowed in cache.
*/
public int getCacheSize()
{
return cacheSize;
}
/**
* Set size (number of entries) of cache.
*
* @param cacheSize Number of entries allowed in cache.
*/
public void setCacheSize(int cacheSize)
{
this.cacheSize = cacheSize;
}
public ExpiryType getExpiryType()
{
return expiryType;
}
public void setExpiryType(ExpiryType expiryType)
{
this.expiryType = expiryType;
}
public CacheManager getCacheManager()
{
return cacheManager;
}
public void setCacheManager(CacheManager cacheManager)
{
this.cacheManager = cacheManager;
}
}