blob: ce02c08844da1e400a48adf1853e301567701da0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.enrich;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Lists;
import com.datatorrent.lib.db.jdbc.JdbcStore;
import com.datatorrent.lib.util.FieldInfo;
/**
* <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/>
* <p>
* Properties:<br>
* <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
* <b>tableName</b>: JDBC table name<br>
* <br>
*
* @since 3.4.0
*/
@InterfaceStability.Evolving
public class JDBCLoader extends JdbcStore implements BackendLoader
{
protected String queryStmt;
protected String tableName;
protected transient List<FieldInfo> includeFieldInfo;
protected transient List<FieldInfo> lookupFieldInfo;
protected Object getQueryResult(Object key)
{
try {
PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
ArrayList<Object> keys = (ArrayList<Object>)key;
for (int i = 0; i < keys.size(); i++) {
getStatement.setObject(i + 1, keys.get(i));
}
return getStatement.executeQuery();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
{
try {
ResultSet resultSet = (ResultSet)result;
if (resultSet.next()) {
ResultSetMetaData rsdata = resultSet.getMetaData();
// If the includefields is empty, populate it from ResultSetMetaData
if (CollectionUtils.isEmpty(includeFieldInfo)) {
if (includeFieldInfo == null) {
includeFieldInfo = new ArrayList<>();
}
for (int i = 1; i <= rsdata.getColumnCount(); i++) {
String columnName = rsdata.getColumnName(i);
// TODO: Take care of type conversion.
includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT));
}
}
ArrayList<Object> res = new ArrayList<Object>();
for (FieldInfo f : includeFieldInfo) {
res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f));
}
return res;
} else {
return null;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private Object getConvertedData(Object object, FieldInfo f)
{
if (f.getType().getJavaType() == object.getClass()) {
return object;
} else {
logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName());
return null;
}
}
private String generateQueryStmt()
{
String stmt = "select * from " + tableName + " where ";
boolean first = true;
for (FieldInfo fieldInfo : lookupFieldInfo) {
if (first) {
first = false;
} else {
stmt += " and ";
}
stmt += fieldInfo.getColumnName() + " = ?";
}
logger.info("generateQueryStmt: {}", stmt);
return stmt;
}
public String getQueryStmt()
{
return queryStmt;
}
/**
* Set the sql Prepared Statement if the enrichment mechanism is query based.
*/
public void setQueryStmt(String queryStmt)
{
this.queryStmt = queryStmt;
}
public String getTableName()
{
return tableName;
}
/**
* Set the table name.
*/
public void setTableName(String tableName)
{
this.tableName = tableName;
}
@Override
public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo)
{
this.lookupFieldInfo = lookupFieldInfo;
this.includeFieldInfo = includeFieldInfo;
if ((queryStmt == null) || (queryStmt.length() == 0)) {
queryStmt = generateQueryStmt();
}
}
@Override
public Map<Object, Object> loadInitialData()
{
return null;
}
@Override
public Object get(Object key)
{
return getDataFrmResult(getQueryResult(key));
}
@Override
public List<Object> getAll(List<Object> keys)
{
List<Object> values = Lists.newArrayList();
for (Object key : keys) {
values.add(get(key));
}
return values;
}
@Override
public void put(Object key, Object value)
{
throw new UnsupportedOperationException("Not supported operation");
}
@Override
public void putAll(Map<Object, Object> m)
{
throw new UnsupportedOperationException("Not supported operation");
}
@Override
public void remove(Object key)
{
throw new UnsupportedOperationException("Not supported operation");
}
}