blob: b4be4809a104c780333d0ebe8c40e117c7e984f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.db.jdbc;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
import com.datatorrent.lib.util.PojoUtils.GetterDouble;
import com.datatorrent.lib.util.PojoUtils.GetterFloat;
import com.datatorrent.lib.util.PojoUtils.GetterInt;
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.GetterShort;
/**
* <p>
* JdbcPOJOOutputOperator class.</p>
* A Generic implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
*
* @displayName Jdbc Output Operator
* @category Output
* @tags database, sql, pojo, jdbc
* @since 2.1.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
{
private List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
protected List<Integer> columnDataTypes;
@NotNull
private String tablename;
protected final transient List<ActiveFieldInfo> columnFieldGetters;
protected transient Class<?> pojoClass;
@InputPortFieldAnnotation(optional = true, schemaRequired = true)
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void setup(Context.PortContext context)
{
pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
}
@Override
public void process(Object t)
{
AbstractJdbcPOJOOutputOperator.super.input.process(t);
}
};
public AbstractJdbcPOJOOutputOperator()
{
super();
columnFieldGetters = Lists.newArrayList();
}
protected static class ActiveFieldInfo
{
final FieldInfo fieldInfo;
Object setterOrGetter;
ActiveFieldInfo(FieldInfo fieldInfo)
{
this.fieldInfo = fieldInfo;
}
}
@Override
@SuppressWarnings("unchecked")
protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
{
final int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
final int type = columnDataTypes.get(i);
ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.BOOLEAN):
statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.TINYINT):
statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.SMALLINT):
statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.INTEGER):
statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.BIGINT):
statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.FLOAT):
statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case (Types.DOUBLE):
statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.DECIMAL:
statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.TIMESTAMP:
statement.setTimestamp(i + 1, ((Getter<Object, Timestamp>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.TIME:
statement.setTime(i + 1, ((Getter<Object, Time>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.DATE:
statement.setDate(i + 1, ((Getter<Object, Date>)activeFieldInfo.setterOrGetter).get(tuple));
break;
default:
handleUnknownDataType(type, tuple, activeFieldInfo);
break;
}
}
}
@SuppressWarnings("UnusedParameters")
protected void handleUnknownDataType(int type, Object tuple, ActiveFieldInfo activeFieldInfo)
{
throw new RuntimeException("unsupported data type " + type);
}
/**
* A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
*/
public List<JdbcFieldInfo> getFieldInfos()
{
return fieldInfos;
}
/**
* Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
* The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
*
* @description $[].columnName name of the database column name
* @description $[].pojoFieldExpression pojo field name or expression
* @useSchema $[].pojoFieldExpression input.fields[].name
*/
public void setFieldInfos(List<JdbcFieldInfo> fieldInfos)
{
this.fieldInfos = fieldInfos;
}
/*
* Gets the name of the table in database.
*/
public String getTablename()
{
return tablename;
}
/**
* Set the target table name in database
* @param tablename : table name as it is stored in the database
*/
public void setTablename(String tablename)
{
this.tablename = tablename;
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPOJOOutputOperator.class);
@Override
public void activate(OperatorContext context)
{
super.activate(context);
final int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
final int type = columnDataTypes.get(i);
ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression(),
String.class);
break;
case (Types.BOOLEAN):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.TINYINT):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.SMALLINT):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.INTEGER):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.BIGINT):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.FLOAT):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case (Types.DOUBLE):
activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case Types.DECIMAL:
activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
break;
case Types.TIMESTAMP:
activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class);
break;
case Types.TIME:
activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class);
break;
case Types.DATE:
activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class);
break;
default:
handleUnknownDataType(type, null, activeFieldInfo);
break;
}
}
}
/**
* Function to initialize the list of {@link JdbcFieldInfo} from properties.xml
* @param index
* @param value
*/
public void setFieldInfosItem(int index, String value)
{
try {
JSONObject jo = new JSONObject(value);
JdbcFieldInfo jdbcFieldInfo = new JdbcFieldInfo(jo.getString("columnName"), jo.getString("pojoFieldExpression"),
FieldInfo.SupportType.valueOf(jo.getString("type")), jo.getInt("sqlType"));
final int need = index - fieldInfos.size() + 1;
for (int i = 0; i < need; i++) {
fieldInfos.add(null);
}
fieldInfos.set(index,jdbcFieldInfo);
} catch (Exception e) {
throw new RuntimeException("Exception in setting JdbcFieldInfo");
}
}
}