blob: 35963915555161a172728b98743d4f4536337a83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.redis;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Setter;
import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
import com.datatorrent.lib.util.PojoUtils.SetterDouble;
import com.datatorrent.lib.util.PojoUtils.SetterFloat;
import com.datatorrent.lib.util.PojoUtils.SetterInt;
import com.datatorrent.lib.util.PojoUtils.SetterLong;
import com.datatorrent.lib.util.PojoUtils.SetterShort;
import com.datatorrent.netlet.util.DTThrowable;
/**
* This is a Redis input operator, which scans all keys in Redis store It
* converts Value stored as map to Plain Old Java Object. It outputs
* KeyValuePair with POJO as value
* <p>
* This output adapter Reads from RedisStore stored as <Key, Map> It outputs a
* Key value pair <key, POJO> as tuples.
* </p>
*
* @displayName Redis POJO Input Operator
* @category Store
* @tags output operator, key value
* @since 3.1.0
*/
@Evolving
public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Object>>
{
protected final Map<String, Object> map = new HashMap<String, Object>();
private ArrayList<FieldInfo> dataColumns;
private transient ArrayList<Object> setters;
private boolean isFirstTuple = true;
private String outputClass;
private Class<?> objectClass;
public RedisPOJOInputOperator()
{
super();
setters = new ArrayList<Object>();
}
@SuppressWarnings("unchecked")
private Object convertMapToObject(Map<String, String> tuple)
{
try {
Object mappedObject = objectClass.newInstance();
for (int i = 0; i < dataColumns.size(); i++) {
final SupportType type = dataColumns.get(i).getType();
final String columnName = dataColumns.get(i).getColumnName();
if (i < setters.size()) {
String value = tuple.get(columnName);
switch (type) {
case STRING:
((Setter<Object, String>) setters.get(i)).set(mappedObject, value);
break;
case BOOLEAN:
((SetterBoolean) setters.get(i)).set(mappedObject, Boolean.parseBoolean(value));
break;
case SHORT:
((SetterShort) setters.get(i)).set(mappedObject, Short.parseShort(value));
break;
case INTEGER:
((SetterInt) setters.get(i)).set(mappedObject, Integer.parseInt(value));
break;
case LONG:
((SetterLong) setters.get(i)).set(mappedObject, Long.parseLong(value));
break;
case FLOAT:
((SetterFloat) setters.get(i)).set(mappedObject, Float.parseFloat(value));
break;
case DOUBLE:
((SetterDouble) setters.get(i)).set(mappedObject, Double.parseDouble(value));
break;
default:
break;
}
}
}
return mappedObject;
} catch (Exception e) {
DTThrowable.wrapIfChecked(e);
}
return null;
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
}
public void processFirstTuple(Map<String, String> value) throws ClassNotFoundException
{
objectClass = Class.forName(getOutputClass());
final int size = dataColumns.size();
for (int i = 0; i < size; i++) {
final SupportType type = dataColumns.get(i).getType();
final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
final Object setter;
switch (type) {
case STRING:
setter = PojoUtils.createSetter(objectClass, getterExpression, String.class);
break;
case BOOLEAN:
setter = PojoUtils.createSetterBoolean(objectClass, getterExpression);
break;
case SHORT:
setter = PojoUtils.createSetterShort(objectClass, getterExpression);
break;
case INTEGER:
setter = PojoUtils.createSetterInt(objectClass, getterExpression);
break;
case LONG:
setter = PojoUtils.createSetterLong(objectClass, getterExpression);
break;
case FLOAT:
setter = PojoUtils.createSetterFloat(objectClass, getterExpression);
break;
case DOUBLE:
setter = PojoUtils.createSetterDouble(objectClass, getterExpression);
break;
default:
setter = PojoUtils.createSetter(objectClass, getterExpression, Object.class);
break;
}
setters.add(setter);
}
}
@Override
public void processTuples()
{
for (String key : keys) {
if (store.getType(key).equals("hash")) {
Map<String, String> mapValue = store.getMap(key);
if (isFirstTuple) {
try {
processFirstTuple(mapValue);
} catch (ClassNotFoundException e) {
DTThrowable.rethrow(e);
}
}
isFirstTuple = false;
outputPort.emit(new KeyValPair<String, Object>(key, convertMapToObject(mapValue)));
}
}
keys.clear();
}
/*
* Output class type
*/
public String getOutputClass()
{
return outputClass;
}
public void setOutputClass(String outputClass)
{
this.outputClass = outputClass;
}
/*
* An arraylist of data column names to be set in Redis store as a Map. Gets
* column names, column expressions and column data types
*/
public ArrayList<FieldInfo> getDataColumns()
{
return dataColumns;
}
public void setDataColumns(ArrayList<FieldInfo> dataColumns)
{
this.dataColumns = dataColumns;
}
@Override
public KeyValPair<String, Object> convertToTuple(Map<Object, Object> o)
{
// Do nothing for the override, Scan already done in processTuples
return null;
}
}