blob: b3473b204b28247cd2ed13a2f08e8f2e5292dc06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hbase.streaming;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hbase.table.HBaseTableSchemaV2;
import org.apache.flink.connectors.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* A base stream writer for HBase engine, commonly an sub-class implementation should override `invoke` method for sink function.
* An example:
* @Override
* public void invoke(T value, Context context) throws Exception {
* // convert T to HBase Mutation(Put/Delete)
* ...
*
* // then call hbase-client's api to send the mutation request to (remote) region server.
* htable.put/delete ...
*
* // or you can do some writing optimization such as batchPut / asyncIO
* // thus, maybe you should implements {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
* // and do some thing before `snapshotState`
* }
* Configuration apply sequence:
* User constructed configuration from client > `hbase-site.xml` in client classpath > `hbase-site.xml` in runtime classpath.
*/
public abstract class HBaseWriterBase<T> extends RichSinkFunction<T> {
private static final long serialVersionUID = 6237464944162580678L;
private static final Logger LOG = LoggerFactory.getLogger(HBaseWriterBase.class);
protected String hTableName;
protected HBaseTableSchemaV2 hTableSchema;
private byte[] serializedConfig;
private transient Connection hConnection;
protected transient HTable table;
public HBaseWriterBase(String hTableName, HBaseTableSchemaV2 hTableSchema) throws IOException {
// serialize default HBaseConfiguration from client's env
this(hTableName, hTableSchema, HBaseConfiguration.create());
}
public HBaseWriterBase(String hTableName, HBaseTableSchemaV2 hTableSchema, org.apache.hadoop.conf.Configuration conf) throws IOException {
this.hTableName = hTableName;
this.hTableSchema = hTableSchema;
// Configuration is not serializable
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
}
private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
// user params from client-side have the highest priority
org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
LOG.error(String.format("can not connect to hbase without {%s} configuration", HConstants.ZOOKEEPER_QUORUM));
throw new IOException("check hbase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
}
return runtimeConfig;
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
if (null == hConnection) {
hConnection = ConnectionFactory.createConnection(config);
}
table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
LOG.info("end open.");
}
@Override
public void close() {
LOG.info("start close ...");
if (null != table) {
try {
table.close();
} catch (IOException e) {
// ignore exception when close.
LOG.warn("exception when close table", e);
}
}
if (null != hConnection) {
try {
hConnection.close();
} catch (IOException e) {
// ignore exception when close.
LOG.warn("exception when close connection", e);
}
}
LOG.info("end close.");
}
}