blob: ffa797dfa9d1cdb81f6c1938f16e00c325ceee02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util.json;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.expression.function.EncodeFormat;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.UpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
/** {@link UpsertExecutor} over {@link Map} objects, as parsed from JSON. */
public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> {
protected static final Logger LOG = LoggerFactory.getLogger(JsonUpsertExecutor.class);
/** Testing constructor. Do not use in prod. */
@VisibleForTesting
protected JsonUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList,
PreparedStatement stmt, UpsertListener<Map<?, ?>> upsertListener) {
super(conn, columnInfoList, stmt, upsertListener);
finishInit();
}
public JsonUpsertExecutor(Connection conn, String tableName, List<ColumnInfo> columnInfoList,
UpsertExecutor.UpsertListener<Map<?, ?>> upsertListener) {
super(conn, tableName, columnInfoList, upsertListener);
finishInit();
}
@Override
protected void execute(Map<?, ?> record) {
int fieldIndex = 0;
String colName = null;
try {
if (record.size() < conversionFunctions.size()) {
String message = String.format("JSON record does not have enough values (has %d, but needs %d)",
record.size(), conversionFunctions.size());
throw new IllegalArgumentException(message);
}
for (fieldIndex = 0; fieldIndex < conversionFunctions.size(); fieldIndex++) {
colName = CaseFormat.UPPER_UNDERSCORE.to(
CaseFormat.LOWER_UNDERSCORE, columnInfos.get(fieldIndex).getColumnName());
if (colName.contains(".")) {
StringBuilder sb = new StringBuilder();
String[] parts = colName.split("\\.");
// assume first part is the column family name; omita
for (int i = 1; i < parts.length; i++) {
sb.append(parts[i]);
if (i != parts.length - 1) {
sb.append(".");
}
}
colName = sb.toString();
}
if (colName.contains("\"")) {
colName = colName.replace("\"", "");
}
Object sqlValue = conversionFunctions.get(fieldIndex).apply(record.get(colName));
if (sqlValue != null) {
preparedStatement.setObject(fieldIndex + 1, sqlValue);
} else {
preparedStatement.setNull(fieldIndex + 1, dataTypes.get(fieldIndex).getSqlType());
}
}
preparedStatement.execute();
upsertListener.upsertDone(++upsertCount);
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
// Even though this is an error we only log it with debug logging because we're notifying the
// listener, and it can do its own logging if needed
LOG.debug("Error on record " + record + ", fieldIndex " + fieldIndex + ", colName " + colName, e);
}
upsertListener.errorOnRecord(record, new Exception("fieldIndex: " + fieldIndex + ", colName " + colName, e));
}
}
@Override
public void close() throws IOException {
try {
preparedStatement.close();
} catch (SQLException e) {
// An exception while closing the prepared statement is most likely a sign of a real problem, so we don't
// want to hide it with closeQuietly or something similar
throw new RuntimeException(e);
}
}
@Override
protected Function<Object, Object> createConversionFunction(PDataType dataType) {
if (dataType.isArrayType()) {
return new ArrayDatatypeConversionFunction(
new ObjectToArrayConverter(
conn,
PDataType.fromTypeId(dataType.getSqlType() - PDataType.ARRAY_TYPE_BASE)));
} else {
return new SimpleDatatypeConversionFunction(dataType, this.conn);
}
}
/**
* Performs typed conversion from String values to a given column value type.
*/
static class SimpleDatatypeConversionFunction implements Function<Object, Object> {
private final PDataType dataType;
private final DateUtil.DateTimeParser dateTimeParser;
private final String binaryEncoding;
SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
Properties props;
try {
props = conn.getClientInfo();
} catch (SQLException e) {
throw new RuntimeException(e);
}
this.dataType = dataType;
if (dataType.isCoercibleTo(PTimestamp.INSTANCE)) {
// TODO: move to DateUtil
String dateFormat;
int dateSqlType = dataType.getResultSetSqlType();
if (dateSqlType == Types.DATE) {
dateFormat = props.getProperty(QueryServices.DATE_FORMAT_ATTRIB,
DateUtil.DEFAULT_DATE_FORMAT);
} else if (dateSqlType == Types.TIME) {
dateFormat = props.getProperty(QueryServices.TIME_FORMAT_ATTRIB,
DateUtil.DEFAULT_TIME_FORMAT);
} else {
dateFormat = props.getProperty(QueryServices.TIMESTAMP_FORMAT_ATTRIB,
DateUtil.DEFAULT_TIMESTAMP_FORMAT);
}
String timeZoneId = props.getProperty(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB,
QueryServicesOptions.DEFAULT_DATE_FORMAT_TIMEZONE);
this.dateTimeParser = DateUtil.getDateTimeParser(dateFormat, dataType, timeZoneId);
} else {
this.dateTimeParser = null;
}
this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,
QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
}
@Nullable
@Override
public Object apply(@Nullable Object input) {
if (input == null) {
return null;
}
if (dateTimeParser != null && input instanceof String) {
final String s = (String) input;
long epochTime = dateTimeParser.parseDateTime(s);
byte[] byteValue = new byte[dataType.getByteSize()];
dataType.getCodec().encodeLong(epochTime, byteValue, 0);
return dataType.toObject(byteValue);
}else if (dataType == PBoolean.INSTANCE) {
switch (input.toString()) {
case "true":
case "t":
case "T":
case "1":
return Boolean.TRUE;
case "false":
case "f":
case "F":
case "0":
return Boolean.FALSE;
default:
throw new RuntimeException("Invalid boolean value: '" + input
+ "', must be one of ['true','t','1','false','f','0']");
}
}else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){
EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase());
Object object = null;
switch (format) {
case BASE64:
object = Base64.decode(input.toString());
if (object == null) { throw new IllegalDataException(
"Input: [" + input + "] is not base64 encoded"); }
break;
case ASCII:
object = Bytes.toBytes(input.toString());
break;
default:
throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding + "\"");
}
return object;
}
return dataType.toObject(input, dataType);
}
}
/**
* Converts string representations of arrays into Phoenix arrays of the correct type.
*/
private static class ArrayDatatypeConversionFunction implements Function<Object, Object> {
private final ObjectToArrayConverter arrayConverter;
private ArrayDatatypeConversionFunction(ObjectToArrayConverter arrayConverter) {
this.arrayConverter = arrayConverter;
}
@Nullable
@Override
public Object apply(@Nullable Object input) {
try {
return arrayConverter.toArray(input);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
}