blob: 125ad8f25860a0d9107894b9a047d1a32e692c8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connectors.hbase.table.HBaseTableSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.RowType;
import org.apache.flink.table.api.types.TypeConverters;
import org.apache.flink.table.api.types.TypeInfoWrappedDataType;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.util.TableSchemaUtil;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import scala.Option;
/**
* Creates a TableSource to scan an HBase table.
*
* <p>The table name and required HBase configuration is passed during {@link HBaseTableSource} construction.
* Use {@link #addColumn(String, String, Class)} to specify the family, qualifier, and type of columns to scan.
*
* <p>The TableSource returns {@link GenericRow} with nested Rows for each column family.
*
* <p>The HBaseTableSource is used as shown in the example below.
*
* <pre>
* {@code
* HBaseTableSource hSrc = new HBaseTableSource(conf, "hTable");
* hSrc.addColumn("fam1", "col1", byte[].class);
* hSrc.addColumn("fam1", "col2", Integer.class);
* hSrc.addColumn("fam2", "col1", String.class);
*
* tableEnv.registerTableSource("hTable", hSrc);
* Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
* }
* </pre>
*
*/
public class HBaseTableSource implements BatchTableSource<GenericRow>, ProjectableTableSource {
private Configuration conf;
private String tableName;
private HBaseTableSchema hBaseSchema;
private TableSchema tableSchema;
/**
* The HBase configuration and the name of the table to read.
*
* @param conf hbase configuration
* @param tableName the tableName
*/
public HBaseTableSource(Configuration conf, String tableName) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table name");
this.hBaseSchema = new HBaseTableSchema();
}
private HBaseTableSource(Configuration conf, String tableName, TableSchema tableSchema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table name");
this.hBaseSchema = new HBaseTableSchema();
this.tableSchema = tableSchema;
}
/**
* Adds a column defined by family, qualifier, and type to the table schema.
*
* @param family the family name
* @param qualifier the qualifier name
* @param clazz the data type of the qualifier
*/
public void addColumn(String family, String qualifier, Class<?> clazz) {
this.hBaseSchema.addColumn(family, qualifier, clazz);
}
/**
* Specifies the charset to parse Strings to HBase byte[] keys and String values.
*
* @param charset Name of the charset to use.
*/
public void setCharset(String charset) {
this.hBaseSchema.setCharset(charset);
}
@Override
public RowType getReturnType() {
return new RowType(
Arrays.stream(getFieldTypes()).map((Function<TypeInformation, DataType>) TypeInfoWrappedDataType::new)
.toArray(DataType[]::new),
getFieldNames());
}
@Override
public TableSchema getTableSchema() {
return TableSchemaUtil.fromDataType(getReturnType(), Option.empty());
}
private String[] getFieldNames() {
return hBaseSchema.getFamilyNames();
}
private TypeInformation[] getFieldTypes() {
String[] famNames = hBaseSchema.getFamilyNames();
TypeInformation[] fieldTypes = new TypeInformation[hBaseSchema.getFamilyNames().length];
int i = 0;
for (String family : famNames) {
fieldTypes[i] = new BaseRowTypeInfo(hBaseSchema.getQualifierTypes(family), hBaseSchema.getQualifierNames(family));
i++;
}
return fieldTypes;
}
@Override
public DataStream<GenericRow> getBoundedStream(StreamExecutionEnvironment streamEnv) {
return streamEnv.createInput(
new HBaseRowInputFormat(conf, tableName, hBaseSchema),
(TypeInformation) TypeConverters.toBaseRowTypeInfo(getReturnType()),
explainSource());
}
@Override
public TableStats getTableStats() {
return null;
}
@Override
public HBaseTableSource projectFields(int[] fields) {
String[] famNames = hBaseSchema.getFamilyNames();
HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName, getTableSchema());
// Extract the family from the given fields
for (int field : fields) {
String family = famNames[field];
Map<String, TypeInformation<?>> familyInfo = hBaseSchema.getFamilyInfo(family);
for (String qualifier : familyInfo.keySet()) {
// create the newSchema
newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass());
}
}
return newTableSource;
}
@Override
public String explainSource() {
String fieldNames = (getFieldNames() == null) ? "*" : StringUtils.join(getFieldNames(), ", ");
return String.format("%s, selectedFields=[%s]", getClass().getCanonicalName(), fieldNames);
}
}