blob: d21cc4aa98496e8420446ff20ae55d217997cb9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.hbase.util;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Utilities for HBase serialization and deserialization. */
public class HBaseSerde {
private static final byte[] EMPTY_BYTES = new byte[] {};
private static final int MIN_TIMESTAMP_PRECISION = 0;
private static final int MAX_TIMESTAMP_PRECISION = 3;
private static final int MIN_TIME_PRECISION = 0;
private static final int MAX_TIME_PRECISION = 3;
private final byte[] nullStringBytes;
// row key index in output row
private final int rowkeyIndex;
// family keys
private final byte[][] families;
// qualifier keys
private final byte[][][] qualifiers;
private final int fieldLength;
private GenericRowData reusedRow;
private GenericRowData[] reusedFamilyRows;
private final @Nullable FieldEncoder keyEncoder;
private final @Nullable FieldDecoder keyDecoder;
private final FieldEncoder[][] qualifierEncoders;
private final FieldDecoder[][] qualifierDecoders;
private final GenericRowData rowWithRowKey;
public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {
this.families = hbaseSchema.getFamilyKeys();
this.rowkeyIndex = hbaseSchema.getRowKeyIndex();
LogicalType rowkeyType =
hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
// field length need take row key into account if it exists.
if (rowkeyIndex != -1 && rowkeyType != null) {
this.fieldLength = families.length + 1;
this.keyEncoder = createFieldEncoder(rowkeyType);
this.keyDecoder = createFieldDecoder(rowkeyType);
} else {
this.fieldLength = families.length;
this.keyEncoder = null;
this.keyDecoder = null;
}
this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
// prepare output rows
this.reusedRow = new GenericRowData(fieldLength);
this.reusedFamilyRows = new GenericRowData[families.length];
this.qualifiers = new byte[families.length][][];
this.qualifierEncoders = new FieldEncoder[families.length][];
this.qualifierDecoders = new FieldDecoder[families.length][];
String[] familyNames = hbaseSchema.getFamilyNames();
for (int f = 0; f < families.length; f++) {
this.qualifiers[f] = hbaseSchema.getQualifierKeys(familyNames[f]);
DataType[] dataTypes = hbaseSchema.getQualifierDataTypes(familyNames[f]);
this.qualifierEncoders[f] =
Arrays.stream(dataTypes)
.map(DataType::getLogicalType)
.map(t -> createNullableFieldEncoder(t, nullStringBytes))
.toArray(FieldEncoder[]::new);
this.qualifierDecoders[f] =
Arrays.stream(dataTypes)
.map(DataType::getLogicalType)
.map(t -> createNullableFieldDecoder(t, nullStringBytes))
.toArray(FieldDecoder[]::new);
this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length);
}
this.rowWithRowKey = new GenericRowData(1);
}
/**
* Returns an instance of Put that writes record to HBase table.
*
* @return The appropriate instance of Put for this use case.
*/
public @Nullable Put createPutMutation(RowData row) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
// upsert
Put put = new Put(rowkey);
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
RowData familyRow = row.getRow(i, qualifiers[f].length);
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// serialize value
byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
put.addColumn(familyKey, qualifier, value);
}
}
}
return put;
}
/**
* Returns an instance of Delete that remove record from HBase table.
*
* @return The appropriate instance of Delete for this use case.
*/
public @Nullable Delete createDeleteMutation(RowData row) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
// delete
Delete delete = new Delete(rowkey);
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
delete.addColumn(familyKey, qualifier);
}
}
}
return delete;
}
/**
* Returns an instance of Scan that retrieves the required subset of records from the HBase
* table.
*
* @return The appropriate instance of Scan for this use case.
*/
public Scan createScan() {
Scan scan = new Scan();
for (int f = 0; f < families.length; f++) {
byte[] family = families[f];
for (int q = 0; q < qualifiers[f].length; q++) {
byte[] quantifier = qualifiers[f][q];
scan.addColumn(family, quantifier);
}
}
return scan;
}
/**
* Returns an instance of Get that retrieves the matches records from the HBase table.
*
* @return The appropriate instance of Get for this use case.
*/
public Get createGet(Object rowKey) {
checkArgument(keyEncoder != null, "row key is not set.");
rowWithRowKey.setField(0, rowKey);
byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
Get get = new Get(rowkey);
for (int f = 0; f < families.length; f++) {
byte[] family = families[f];
for (byte[] qualifier : qualifiers[f]) {
get.addColumn(family, qualifier);
}
}
return get;
}
/**
* Converts HBase {@link Result} into a new {@link RowData} instance.
*
* <p>Note: this method is thread-safe.
*/
public RowData convertToNewRow(Result result) {
// The output rows needs to be initialized each time
// to prevent the possibility of putting the output object into the cache.
GenericRowData resultRow = new GenericRowData(fieldLength);
GenericRowData[] familyRows = new GenericRowData[families.length];
for (int f = 0; f < families.length; f++) {
familyRows[f] = new GenericRowData(qualifiers[f].length);
}
return convertToRow(result, resultRow, familyRows);
}
/**
* Converts HBase {@link Result} into a reused {@link RowData} instance.
*
* <p>Note: this method is NOT thread-safe.
*/
public RowData convertToReusedRow(Result result) {
return convertToRow(result, reusedRow, reusedFamilyRows);
}
private RowData convertToRow(
Result result, GenericRowData resultRow, GenericRowData[] familyRows) {
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
assert keyDecoder != null;
Object rowkey = keyDecoder.decode(result.getRow());
resultRow.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
GenericRowData familyRow = familyRows[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// read value
byte[] value = result.getValue(familyKey, qualifier);
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
resultRow.setField(i, familyRow);
}
}
return resultRow;
}
/**
* Converts HBase {@link Result} into {@link RowData}.
*
* @deprecated Use {@link #convertToReusedRow(Result)} instead.
*/
@Deprecated
public RowData convertToRow(Result result) {
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
assert keyDecoder != null;
Object rowkey = keyDecoder.decode(result.getRow());
reusedRow.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
GenericRowData familyRow = reusedFamilyRows[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// read value
byte[] value = result.getValue(familyKey, qualifier);
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
reusedRow.setField(i, familyRow);
}
}
return reusedRow;
}
// ------------------------------------------------------------------------------------
// HBase Runtime Encoders
// ------------------------------------------------------------------------------------
/** Runtime encoder that encodes a specified field in {@link RowData} into byte[]. */
@FunctionalInterface
private interface FieldEncoder extends Serializable {
byte[] encode(RowData row, int pos);
}
private static FieldEncoder createNullableFieldEncoder(
LogicalType fieldType, final byte[] nullStringBytes) {
final FieldEncoder encoder = createFieldEncoder(fieldType);
if (fieldType.isNullable()) {
if (fieldType.is(LogicalTypeFamily.CHARACTER_STRING)) {
// special logic for null string values, because HBase can store empty bytes for
// string
return (row, pos) -> {
if (row.isNullAt(pos)) {
return nullStringBytes;
} else {
return encoder.encode(row, pos);
}
};
} else {
// encode empty bytes for null values
return (row, pos) -> {
if (row.isNullAt(pos)) {
return EMPTY_BYTES;
} else {
return encoder.encode(row, pos);
}
};
}
} else {
return encoder;
}
}
private static FieldEncoder createFieldEncoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
case VARCHAR:
// get the underlying UTF-8 bytes
return (row, pos) -> row.getString(pos).toBytes();
case BOOLEAN:
return (row, pos) -> Bytes.toBytes(row.getBoolean(pos));
case BINARY:
case VARBINARY:
return RowData::getBinary;
case DECIMAL:
return createDecimalEncoder((DecimalType) fieldType);
case TINYINT:
return (row, pos) -> new byte[] {row.getByte(pos)};
case SMALLINT:
return (row, pos) -> Bytes.toBytes(row.getShort(pos));
case INTEGER:
case DATE:
case INTERVAL_YEAR_MONTH:
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIME type is out of the range [%s, %s] supported by "
+ "HBase connector",
timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
}
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case BIGINT:
case INTERVAL_DAY_TIME:
return (row, pos) -> Bytes.toBytes(row.getLong(pos));
case FLOAT:
return (row, pos) -> Bytes.toBytes(row.getFloat(pos));
case DOUBLE:
return (row, pos) -> Bytes.toBytes(row.getDouble(pos));
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
if (timestampPrecision < MIN_TIMESTAMP_PRECISION
|| timestampPrecision > MAX_TIMESTAMP_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
+ "HBase connector",
timestampPrecision,
MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
}
return createTimestampEncoder(timestampPrecision);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}
private static FieldEncoder createDecimalEncoder(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return (row, pos) -> {
BigDecimal decimal = row.getDecimal(pos, precision, scale).toBigDecimal();
return Bytes.toBytes(decimal);
};
}
private static FieldEncoder createTimestampEncoder(final int precision) {
return (row, pos) -> {
long millisecond = row.getTimestamp(pos, precision).getMillisecond();
return Bytes.toBytes(millisecond);
};
}
// ------------------------------------------------------------------------------------
// HBase Runtime Decoders
// ------------------------------------------------------------------------------------
/** Runtime decoder that decodes a byte[] into objects of internal data structure. */
@FunctionalInterface
private interface FieldDecoder extends Serializable {
@Nullable
Object decode(byte[] value);
}
private static FieldDecoder createNullableFieldDecoder(
LogicalType fieldType, final byte[] nullStringBytes) {
final FieldDecoder decoder = createFieldDecoder(fieldType);
if (fieldType.isNullable()) {
if (fieldType.is(LogicalTypeFamily.CHARACTER_STRING)) {
return value -> {
if (value == null || Arrays.equals(value, nullStringBytes)) {
return null;
} else {
return decoder.decode(value);
}
};
} else {
return value -> {
if (value == null || value.length == 0) {
return null;
} else {
return decoder.decode(value);
}
};
}
} else {
return decoder;
}
}
private static FieldDecoder createFieldDecoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
case VARCHAR:
// reuse bytes
return StringData::fromBytes;
case BOOLEAN:
return Bytes::toBoolean;
case BINARY:
case VARBINARY:
return value -> value;
case DECIMAL:
return createDecimalDecoder((DecimalType) fieldType);
case TINYINT:
return value -> value[0];
case SMALLINT:
return Bytes::toShort;
case INTEGER:
case DATE:
case INTERVAL_YEAR_MONTH:
return Bytes::toInt;
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIME type is out of the range [%s, %s] supported by "
+ "HBase connector",
timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
}
return Bytes::toInt;
case BIGINT:
case INTERVAL_DAY_TIME:
return Bytes::toLong;
case FLOAT:
return Bytes::toFloat;
case DOUBLE:
return Bytes::toDouble;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
if (timestampPrecision < MIN_TIMESTAMP_PRECISION
|| timestampPrecision > MAX_TIMESTAMP_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
+ "HBase connector",
timestampPrecision,
MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
}
return createTimestampDecoder();
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}
private static FieldDecoder createDecimalDecoder(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return value -> {
BigDecimal decimal = Bytes.toBigDecimal(value);
return DecimalData.fromBigDecimal(decimal, precision, scale);
};
}
private static FieldDecoder createTimestampDecoder() {
return value -> {
// TODO: support higher precision
long milliseconds = Bytes.toLong(value);
return TimestampData.fromEpochMillis(milliseconds);
};
}
}