blob: 0f90e450ca952b570b8376ff666d7274765ccd24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reducer class for the bulkload jobs.
* Performs similar functionality to {@link KeyValueSortReducer}
*/
public class FormatToKeyValueReducer
extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class);
protected List<String> tableNames;
protected List<String> logicalNames;
protected KeyValueBuilder builder;
List<List<Pair<byte[], byte[]>>> columnIndexes;
List<ImmutableBytesPtr> emptyFamilyName;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// pass client configuration into driver
Properties clientInfos = new Properties();
for (Map.Entry<String, String> entry : conf) {
clientInfos.setProperty(entry.getKey(), entry.getValue());
}
try {
PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
builder = conn.getKeyValueBuilder();
final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
columnIndexes = new ArrayList<>(tableNames.size());
emptyFamilyName = new ArrayList<>();
initColumnsMap(conn);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
for (String tableName : logicalNames) {
PTable table = PhoenixRuntime.getTable(conn, tableName);
emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
for(int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
if(c.getFamilyName() == null) {
list.add(null); // Skip PK column
continue;
}
byte[] family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
list.add(new Pair(family, name));
}
columnIndexes.add(list);
}
}
@Override
protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
int tableIndex = Integer.parseInt(key.getTableName());
key.setTableName(tableNames.get(tableIndex));
List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available()!= 0) {
int index = WritableUtils.readVInt(input);
Pair<byte[], byte[]> pair = columns.get(index);
byte type = input.readByte();
ImmutableBytesWritable value = null;
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
input.read(array);
value = new ImmutableBytesWritable(array);
}
KeyValue kv;
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
kv = builder.buildPut(key.getRowkey(),
new ImmutableBytesWritable(pair.getFirst()),
new ImmutableBytesWritable(pair.getSecond()), value);
break;
case DeleteColumn: // null value
kv = builder.buildDeleteColumns(key.getRowkey(),
new ImmutableBytesWritable(pair.getFirst()),
new ImmutableBytesWritable(pair.getSecond()));
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
}
map.add(kv);
}
KeyValue empty = builder.buildPut(key.getRowkey(),
emptyFamilyName.get(tableIndex),
QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
map.add(empty);
Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv : map) {
context.write(key, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
}