blob: 6d3c2c469d6c683783875d4d590f17c099ec64d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.hbase;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.generated.Bolt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HTable connector for Storm {@link Bolt}
* <p>
* The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
* classpath
*/
@SuppressWarnings("serial")
public class HTableConnector extends Connector implements Serializable{
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Configuration conf;
protected HTableInterface table;
private String tableName;
private String connectorImpl;
/**
* Initialize HTable connection
* @param conf The {@link TupleTableConfig}
* @throws IOException
*/
public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
super(conf, _quorum, _port);
this.connectorImpl = conf.getConnectorImpl();
this.tableName = conf.getTableName();
this.conf = HBaseConfiguration.create();
if(_quorum != null && _port != null)
{
this.conf.set("hbase.zookeeper.quorum", _quorum);
this.conf.set("hbase.zookeeper.property.clientPort", _port);
}
LOG.info("Initializing connection to HBase table {} at {}", tableName,
this.conf.get("hbase.rootdir"));
try {
this.table = getTableProvider().getTable(this.conf, this.tableName);
} catch (IOException ex) {
throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
}
if (conf.isBatch()) {
// Enable client-side write buffer
this.table.setAutoFlush(false, true);
LOG.info("Enabled client-side write buffer");
}
// If set, override write buffer size
if (conf.getWriteBufferSize() > 0) {
try {
this.table.setWriteBufferSize(conf.getWriteBufferSize());
LOG.info("Setting client-side write buffer to {}", conf.getWriteBufferSize());
} catch (IOException ex) {
LOG.error("Unable to set client-side write buffer size for HBase table {}", this.tableName,
ex);
}
}
// Check the configured column families exist
for (String cf : conf.getColumnFamilies()) {
if (!columnFamilyExists(cf)) {
throw new RuntimeException(String.format(
"HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
}
}
}
protected TableProvider getTableProvider() throws IOException {
if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
return new HTableProvider();
}
else {
try {
Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
return clazz.getConstructor().newInstance();
} catch (InstantiationException e) {
throw new IOException("Unable to instantiate connector.", e);
} catch (IllegalAccessException e) {
throw new IOException("Unable to instantiate connector: illegal access", e);
} catch (InvocationTargetException e) {
throw new IOException("Unable to instantiate connector", e);
} catch (NoSuchMethodException e) {
throw new IOException("Unable to instantiate connector: no such method", e);
} catch (ClassNotFoundException e) {
throw new IOException("Unable to instantiate connector: class not found", e);
}
}
}
/**
* Checks to see if table contains the given column family
* @param columnFamily The column family name
* @return boolean
* @throws IOException
*/
private boolean columnFamilyExists(final String columnFamily) throws IOException {
return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
}
/**
* @return the table
*/
public HTableInterface getTable() {
return table;
}
@Override
public void put(Put put) throws IOException {
table.put(put);
}
/**
* Close the table
*/
@Override
public void close() {
try {
this.table.close();
} catch (IOException ex) {
LOG.error("Unable to close connection to HBase table {}", tableName, ex);
}
}
}