blob: ec860a54bb1faa8333fd68e19c6aaa13d36b50b6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.hbase.bolt;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.hbase.ColumnList;
import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
import org.apache.metron.hbase.client.HBaseClient;
import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A bolt that writes to HBase.
*
* Each bolt defined within a topology can interact with only a single HBase table.
*/
public class HBaseBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Should the write-ahead-log be used.
*/
private boolean writeToWAL = true;
/**
* The interval in seconds at which the writes are flushed to Hbase.
*/
private int flushIntervalSecs = 1;
/**
* The batch size.
*/
private int batchSize = 1000;
/**
* The name of the HBase table. Each bolt communicates with a single HBase table.
*/
protected String tableName;
/**
* The mapper which defines how tuple fields are mapped to HBase.
*/
protected HBaseMapper mapper;
/**
* The name of the class that should be used as a table provider.
*
* <p>Defaults to 'org.apache.metron.hbase.HTableProvider'.
*/
protected String tableProviderClazzName = "org.apache.metron.hbase.HTableProvider";
/**
* The TableProvider
* May be loaded from tableProviderClazzName or provided
*/
protected TableProvider tableProvider;
private BatchHelper batchHelper;
protected OutputCollector collector;
protected transient HBaseClient hbaseClient;
public HBaseBolt(String tableName, HBaseMapper mapper) {
this.tableName = tableName;
this.mapper = mapper;
}
public HBaseBolt writeToWAL(boolean writeToWAL) {
this.writeToWAL = writeToWAL;
return this;
}
public HBaseBolt withTableProvider(String tableProvider) {
this.tableProviderClazzName = tableProvider;
return this;
}
public HBaseBolt withTableProviderInstance(TableProvider tableProvider){
this.tableProvider = tableProvider;
return this;
}
public HBaseBolt withBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
this.flushIntervalSecs = flushIntervalSecs;
return this;
}
public void setClient(HBaseClient hbaseClient) {
this.hbaseClient = hbaseClient;
}
@Override
public Map<String, Object> getComponentConfiguration() {
LOG.debug("Tick tuples expected every {} second(s)", flushIntervalSecs);
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
return conf;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
this.batchHelper = new BatchHelper(batchSize, collector);
TableProvider provider;
if(this.tableProvider == null) {
provider = createTableProvider(tableProviderClazzName);
} else {
provider = this.tableProvider;
}
hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// nothing emitted
}
@Override
public void execute(Tuple tuple) {
LOG.trace("Received a tuple.");
try {
if (batchHelper.shouldHandle(tuple)) {
save(tuple);
}
if (batchHelper.shouldFlush()) {
flush();
}
} catch (Exception e) {
batchHelper.fail(e);
hbaseClient.clearMutations();
}
}
/**
* Saves an operation for later.
* @param tuple Contains the data elements that need written to HBase.
*/
private void save(Tuple tuple) {
byte[] rowKey = mapper.rowKey(tuple);
ColumnList cols = mapper.columns(tuple);
Durability durability = writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL;
Optional<Long> ttl = mapper.getTTL(tuple);
if(ttl.isPresent()) {
hbaseClient.addMutation(rowKey, cols, durability, ttl.get());
} else {
hbaseClient.addMutation(rowKey, cols, durability);
}
batchHelper.addBatch(tuple);
LOG.debug("Added mutation to the batch; size={}", batchHelper.getBatchSize());
}
/**
* Flush all saved operations.
*/
private void flush() {
LOG.debug("About to flush a batch of {} mutation(s)", batchHelper.getBatchSize());
this.hbaseClient.mutate();
batchHelper.ack();
}
/**
* Creates a TableProvider based on a class name.
* @param connectorImpl The class name of a TableProvider
*/
private static TableProvider createTableProvider(String connectorImpl) {
LOG.trace("Creating table provider; className={}", connectorImpl);
// if class name not defined, use a reasonable default
if(StringUtils.isEmpty(connectorImpl) || connectorImpl.charAt(0) == '$') {
return new HTableProvider();
}
// instantiate the table provider
try {
Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
return clazz.getConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | IllegalStateException |
InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
throw new IllegalStateException("Unable to instantiate connector", e);
}
}
}