blob: 0d3e724102362b3cf2826015bfe8bd59d95a782a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ColumnInfo;
import org.joda.time.DateTime;
public class PhoenixRecordWritable implements DBWritable {
private final List<Object> upsertValues = new ArrayList<>();
private final Map<String, Object> resultMap = new LinkedHashMap<>();
private List<ColumnInfo> columnMetaDataList;
/** For serialization; do not use. */
public PhoenixRecordWritable() {
this(new ArrayList<ColumnInfo>());
}
public PhoenixRecordWritable(List<ColumnInfo> columnMetaDataList) {
this.columnMetaDataList = columnMetaDataList;
}
/**
* Helper method to create a {@link Array} for a specific {@link PDataType}, and set it on
* the provided {@code stmt}.
*/
private static void setArrayInStatement(PreparedStatement stmt, PDataType<?> type,
Object[] obj, int position) throws SQLException {
Array sqlArray = stmt.getConnection().createArrayOf(
PDataType.arrayBaseType(type).getSqlTypeName(), obj);
stmt.setArray(position, sqlArray);
}
private static Object[] primativeArrayToObjectArray(byte[] a) {
final Byte[] ret = new Byte[a.length];
for (int i = 0; i < a.length; i++) {
ret[i] = a[i];
}
return ret;
}
private static Object[] primativeArrayToObjectArray(short[] a) {
final Short[] ret = new Short[a.length];
for (int i = 0; i < a.length; i++) {
ret[i] = a[i];
}
return ret;
}
private static Object[] primativeArrayToObjectArray(int[] a) {
final Integer[] ret = new Integer[a.length];
for (int i = 0; i < a.length; i++) {
ret[i] = a[i];
}
return ret;
}
private static Object[] primativeArrayToObjectArray(float[] a) {
final Float[] ret = new Float[a.length];
for (int i = 0; i < a.length; i++) {
ret[i] = a[i];
}
return ret;
}
private static Object[] primativeArrayToObjectArray(double[] a) {
final Double[] ret = new Double[a.length];
for (int i = 0; i < a.length; i++) {
ret[i] = a[i];
}
return ret;
}
private static Object[] primativeArrayToObjectArray(long[] a) {
final Long[] ret = new Long[a.length];
for (int i = 0; i < a.length; i++) {
ret[i] = a[i];
}
return ret;
}
@Override public void write(PreparedStatement statement) throws SQLException {
// make sure we at least line up in size
if (upsertValues.size() != columnMetaDataList.size()) {
throw new UnsupportedOperationException("Provided " + upsertValues.size()
+ " upsert values, but column metadata expects " + columnMetaDataList.size()
+ " columns.");
}
// correlate each value (v) to a column type (c) and an index (i)
for (int i = 0; i < upsertValues.size(); i++) {
Object v = upsertValues.get(i);
ColumnInfo c = columnMetaDataList.get(i);
if (v == null) {
statement.setNull(i + 1, c.getSqlType());
continue;
}
// both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date
// can override any other types here as needed
final Object finalObj;
final PDataType<?> finalType;
if (v instanceof DateTime) {
finalObj = new java.sql.Date(((DateTime) v).getMillis());
finalType = PDate.INSTANCE;
} else if (v instanceof java.util.Date) {
finalObj = new java.sql.Date(((java.util.Date) v).getTime());
finalType = PDate.INSTANCE;
} else {
finalObj = v;
finalType = c.getPDataType();
}
if (finalObj instanceof Object[]) {
setArrayInStatement(statement, finalType, (Object[]) finalObj, i + 1);
} else if (finalObj instanceof byte[]) {
// PVarbinary and PBinary are provided as byte[] but are treated as SQL objects
if (PDataType.equalsAny(finalType, PVarbinary.INSTANCE, PBinary.INSTANCE)) {
statement.setObject(i + 1, finalObj);
} else {
// otherwise set as array type
setArrayInStatement(statement, finalType, primativeArrayToObjectArray((byte[]) finalObj), i + 1);
}
} else if (finalObj instanceof short[]) {
setArrayInStatement(statement, finalType, primativeArrayToObjectArray((short[]) finalObj), i + 1);
} else if (finalObj instanceof int[]) {
setArrayInStatement(statement, finalType, primativeArrayToObjectArray((int[]) finalObj), i + 1);
} else if (finalObj instanceof long[]) {
setArrayInStatement(statement, finalType, primativeArrayToObjectArray((long[]) finalObj), i + 1);
} else if (finalObj instanceof float[]) {
setArrayInStatement(statement, finalType, primativeArrayToObjectArray((float[]) finalObj), i + 1);
} else if (finalObj instanceof double[]) {
setArrayInStatement(statement, finalType, primativeArrayToObjectArray((double[]) finalObj), i + 1);
} else {
statement.setObject(i + 1, finalObj);
}
}
}
@Override public void readFields(ResultSet resultSet) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
// return the contents of a PhoenixArray, if necessary
Object value = resultSet.getObject(i);
// put a (ColumnLabel -> value) entry into the result map
resultMap.put(metaData.getColumnLabel(i), value);
}
}
/** Append an object to the list of values to upsert. */
public void add(Object value) {
upsertValues.add(value);
}
/** @return an immutable view on the {@link ResultSet} content. */
public Map<String, Object> getResultMap() {
return Collections.unmodifiableMap(resultMap);
}
}