blob: c148a2aebd5442976c7c63f53d3da828fb159a51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.hive.mapreduce;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.hive.PhoenixRowKey;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
import org.apache.phoenix.hive.util.ColumnMappingUtils;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.hive.util.PhoenixUtil;
import org.apache.phoenix.util.ColumnInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Serialized class for SerDe
*
*/
public class PhoenixResultWritable implements Writable, DBWritable, Configurable {
private static final Logger LOG = LoggerFactory.getLogger(PhoenixResultWritable.class);
private List<ColumnInfo> columnMetadataList;
private List<Object> valueList; // for output
private Map<String, Object> rowMap = new HashMap<>(); // for input
private Map<String, String> columnMap;
private int columnCount = -1;
private Configuration config;
private boolean isTransactional;
private Map<String, Object> rowKeyMap = new LinkedHashMap();
private List<String> primaryKeyColumnList;
public PhoenixResultWritable() {
}
public PhoenixResultWritable(Configuration config) throws IOException {
setConf(config);
}
public PhoenixResultWritable(Configuration config, List<ColumnInfo> columnMetadataList)
throws IOException {
this(config);
this.columnMetadataList = columnMetadataList;
valueList = new ArrayList<>(columnMetadataList.size());
}
@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
// for write
public void clear() {
valueList.clear();
}
// for write
public void add(Object value) {
valueList.add(value);
}
@Override
public void write(PreparedStatement statement) throws SQLException {
ColumnInfo columnInfo = null;
Object value = null;
try {
for (int i = 0, limit = columnMetadataList.size(); i < limit; i++) {
columnInfo = columnMetadataList.get(i);
if (valueList.size() > i) {
value = valueList.get(i);
} else {
value = null;
}
if (value == null) {
statement.setNull(i + 1, columnInfo.getSqlType());
} else {
statement.setObject(i + 1, value, columnInfo.getSqlType());
}
}
} catch (SQLException | RuntimeException e) {
LOG.error("[column-info, value] : " + columnInfo + ", " + value);
throw e;
}
}
public void delete(PreparedStatement statement) throws SQLException {
ColumnInfo columnInfo = null;
Object value = null;
try {
for (int i = 0, limit = primaryKeyColumnList.size(); i < limit; i++) {
columnInfo = columnMetadataList.get(i);
if (valueList.size() > i) {
value = valueList.get(i);
} else {
value = null;
}
if (value == null) {
statement.setNull(i + 1, columnInfo.getSqlType());
} else {
statement.setObject(i + 1, value, columnInfo.getSqlType());
}
}
} catch (SQLException | RuntimeException e) {
LOG.error("[column-info, value] : " + columnInfo + ", " + value);
throw e;
}
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
ResultSetMetaData rsmd = resultSet.getMetaData();
if (columnCount == -1) {
this.columnCount = rsmd.getColumnCount();
}
rowMap.clear();
for (int i = 0; i < columnCount; i++) {
Object value = resultSet.getObject(i + 1);
String columnName = rsmd.getColumnName(i + 1);
String mapName = columnMap.get(columnName);
if(mapName != null) {
columnName = mapName;
}
rowMap.put(columnName, value);
}
// Adding row__id column.
if (isTransactional) {
rowKeyMap.clear();
for (String pkColumn : primaryKeyColumnList) {
rowKeyMap.put(pkColumn, rowMap.get(pkColumn));
}
}
}
public void readPrimaryKey(PhoenixRowKey rowKey) {
rowKey.setRowKeyMap(rowKeyMap);
}
public List<ColumnInfo> getColumnMetadataList() {
return columnMetadataList;
}
public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
this.columnMetadataList = columnMetadataList;
}
public Map<String, Object> getResultMap() {
return rowMap;
}
public List<Object> getValueList() {
return valueList;
}
@Override
public void setConf(Configuration conf) {
config = conf;
this.columnMap = ColumnMappingUtils.getReverseColumnMapping(config.get(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING,""));
isTransactional = PhoenixStorageHandlerUtil.isTransactionalTable(config);
if (isTransactional) {
primaryKeyColumnList = PhoenixUtil.getPrimaryKeyColumnList(config, config.get
(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME));
}
}
@Override
public Configuration getConf() {
return config;
}
}