blob: b5082f45c3dd874de2b47eaa5f8d5f8e682422c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hbase.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connectors.hbase.util.HBaseBytesSerializer;
import org.apache.flink.connectors.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connectors.hbase.util.HBaseTypeUtils;
import org.apache.flink.table.api.RichTableSchema;
import org.apache.flink.table.api.functions.FunctionContext;
import org.apache.flink.table.api.functions.TableFunction;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
/**
* The HBaseLookupFunction is a standard user-defined table function, it can be used in tableAPI
* and also useful for temporal table join plan in SQL.
*/
public class HBaseLookupFunction extends TableFunction<Row> {
private static final long serialVersionUID = 6552257501168286801L;
private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
private final RichTableSchema tableSchema; // for type info
private final HBaseTableSchemaV2 hTableSchema;
// rowkey field index in source input row
private final int rowKeySourceIndex;
// qualifier fields' indexes in source input row
private final List<Integer> qualifierSourceIndexes;
private final String hTableName;
private List<Tuple3<byte[], byte[], TypeInformation<?>>> qualifierList;
// field serializer for HBase format, order by field index of input row
private List<HBaseBytesSerializer> inputFieldSerializers;
// TODO currently this rowKey always be a binary type (adapt to sql-level type system), should refactor later
private int rowKeyInternalTypeIndex;
private int totalQualifiers;
private String charset;
private byte[] serializedConfig;
private transient Connection hConnection;
protected transient HTable table;
public HBaseLookupFunction(
RichTableSchema tableSchema,
String hbaseTableName,
HBaseTableSchemaV2 hbaseSchema,
int rowKeySourceIndex,
List<Integer> qualifierSourceIndexes,
Configuration hbaseConfigurations) throws IOException {
this.tableSchema = tableSchema;
this.hTableName = hbaseTableName;
this.hTableSchema = hbaseSchema;
// Configuration is not serializable
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(hbaseConfigurations);
this.totalQualifiers = hbaseSchema.getFamilySchema().getTotalQualifiers();
Preconditions.checkArgument(null != qualifierSourceIndexes && totalQualifiers == qualifierSourceIndexes.size(),
"qualifierSourceIndexes info should be consistent with qualifiers defined in HBaseTableSchema!");
Preconditions.checkArgument(rowKeySourceIndex > -1 && totalQualifiers > rowKeySourceIndex,
"rowKeySourceIndex must > -1 and totalQualifiers number must > rowKeySourceIndex");
this.qualifierList = hbaseSchema.getFamilySchema().getFlatByteQualifiers();
this.rowKeySourceIndex = rowKeySourceIndex;
this.qualifierSourceIndexes = qualifierSourceIndexes;
this.charset = hbaseSchema.getFamilySchema().getStringCharset();
this.inputFieldSerializers = new ArrayList<>();
this.rowKeyInternalTypeIndex = HBaseTypeUtils.getTypeIndex(hbaseSchema.getRowKeyType());
for (int index = 0; index <= totalQualifiers; index++) {
if (index == rowKeySourceIndex) {
inputFieldSerializers.add(new HBaseBytesSerializer(hbaseSchema.getRowKeyType(), charset));
} else {
Tuple3<byte[], byte[], TypeInformation<?>> typeInfo;
if (index < rowKeySourceIndex) {
typeInfo = qualifierList.get(index);
} else {
typeInfo = qualifierList.get(index - 1);
}
inputFieldSerializers.add(new HBaseBytesSerializer(typeInfo.f2, charset));
}
}
}
@Override
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return tableSchema.getResultRowType();
}
protected Get createGet(Object rowKey) throws IOException {
// currently this rowKey always be a binary type (adapt to sql-level type system)
byte[] rowkey = HBaseTypeUtils.serializeFromInternalObject(rowKey, rowKeyInternalTypeIndex, HBaseTypeUtils.DEFAULT_CHARSET);
Get get = new Get(rowkey);
get.setMaxVersions(1);
// add request colums
for (Tuple3<byte[], byte[], TypeInformation<?>> typeInfo : qualifierList) {
get.addColumn(typeInfo.f0, typeInfo.f1);
}
return get;
}
public void eval(Object rowKey) throws IOException {
// fetch result
Result result = table.get(createGet(rowKey));
if (!result.isEmpty()) {
// parse and collect
collect(parseResult(result, rowKey, rowKeySourceIndex));
}
}
protected Row parseResult(Result result, Object rowKey, int rowKeySourceIndex) throws UnsupportedEncodingException {
// output rowKey + qualifiers
Row row = new Row(totalQualifiers + 1);
row.setField(rowKeySourceIndex, rowKey);
for (int idx = 0; idx < totalQualifiers; idx++) {
Tuple3<byte[], byte[], TypeInformation<?>> qInfo = qualifierList.get(idx);
byte[] value = result.getValue(qInfo.f0, qInfo.f1);
int qualifierSrcIdx = qualifierSourceIndexes.get(idx);
row.setField(qualifierSrcIdx, inputFieldSerializers.get(qualifierSrcIdx).fromHBaseBytes(value));
}
return row;
}
protected org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
// user params from client-side have the highest priority
org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
LOG.error(String.format("can not connect to hbase without {%s} configuration"), HConstants.ZOOKEEPER_QUORUM);
throw new IOException("check hbase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
}
return runtimeConfig;
}
@Override
public void open(FunctionContext context) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
hConnection = ConnectionFactory.createConnection(config);
table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
} catch (TableNotFoundException tnfe) {
LOG.error("Table '{}' not found ", hTableName, tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
LOG.info("end open.");
}
@Override
public void close() {
LOG.info("start close ...");
if (null != table) {
try {
table.close();
} catch (IOException e) {
// ignore exception when close.
LOG.warn("exception when close table", e);
}
}
if (null != hConnection) {
try {
hConnection.close();
} catch (IOException e) {
// ignore exception when close.
LOG.warn("exception when close connection", e);
}
}
LOG.info("end close.");
}
}