blob: a145274b187dfb121e8c6b04689a103c550832c9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.hdfs.bolt;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.NullPartitioner;
import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractHdfsBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
private static final Integer DEFAULT_RETRY_COUNT = 3;
* Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
private static final Integer DEFAULT_MAX_OPEN_FILES = 50;
protected Map<String, Writer> writers;
protected Map<String, Integer> rotationCounterMap = new HashMap<>();
protected List<RotationAction> rotationActions = new ArrayList<>();
protected OutputCollector collector;
protected transient FileSystem fs;
protected SyncPolicy syncPolicy;
protected FileRotationPolicy rotationPolicy;
protected FileNameFormat fileNameFormat;
protected String fsUrl;
protected String configKey;
protected transient Object writeLock;
protected transient Timer rotationTimer; // only used for TimedRotationPolicy
protected long offset = 0;
protected Integer fileRetryCount = DEFAULT_RETRY_COUNT;
protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
protected Partitioner partitioner = new NullPartitioner();
protected transient Configuration hdfsConfig;
private List<Tuple> tupleBatch = new LinkedList<>();
protected void rotateOutputFile(Writer writer) throws IOException {"Rotating output file...");
long start = System.currentTimeMillis();
synchronized (this.writeLock) {
writer.close();"Performing {} file rotation actions.", this.rotationActions.size());
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, writer.getFilePath());
long time = System.currentTimeMillis() - start;"File rotation took {} ms.", time);
* Marked as final to prevent override. Subclasses should implement the doPrepare() method.
public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) {
this.writeLock = new Object();
if (this.syncPolicy == null) {
throw new IllegalStateException("SyncPolicy must be specified.");
if (this.rotationPolicy == null) {
throw new IllegalStateException("RotationPolicy must be specified.");
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be specified.");
writers = new WritersMap(this.maxOpenFiles, collector);
this.collector = collector;
this.fileNameFormat.prepare(conf, topologyContext);
this.hdfsConfig = new Configuration();
Map<String, Object> map = (Map<String, Object>) conf.get(this.configKey);
if (map != null) {
for (String key : map.keySet()) {
this.hdfsConfig.set(key, String.valueOf(map.get(key)));
try {
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, topologyContext, collector);
} catch (Exception e) {
throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
if (this.rotationPolicy instanceof TimedRotationPolicy) {
public final void execute(Tuple tuple) {
synchronized (this.writeLock) {
boolean forceSync = false;
Writer writer = null;
String writerKey = null;
if (TupleUtils.isTick(tuple)) {
LOG.debug("TICK! forcing a file system flush");
forceSync = true;
} else {
writerKey = getHashKeyForTuple(tuple);
try {
writer = getOrCreateWriter(writerKey, tuple);
this.offset = writer.write(tuple);
} catch (IOException e) {
//If the write failed, try to sync anything already written"Tuple failed to write, forcing a flush of existing data.");
forceSync = true;;
if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
int attempts = 0;
boolean success = false;
IOException lastException = null;
// Make every attempt to sync the data we have. If it can't be done then kill the bolt with
// a runtime exception. The filesystem is presumably in a very bad state.
while (success == false && attempts < fileRetryCount) {
attempts += 1;
try {
LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
for (Tuple t : tupleBatch) {
success = true;
} catch (IOException e) {
LOG.warn("Data could not be synced to filesystem on attempt [{}]", attempts);
lastException = e;
// If unsuccesful fail the pending tuples
if (success == false) {
LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
for (Tuple t : tupleBatch) {;
throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
if (writer != null && writer.needsRotation()) {
doRotationAndRemoveWriter(writerKey, writer);
private Writer getOrCreateWriter(String writerKey, Tuple tuple) throws IOException {
Writer writer;
writer = writers.get(writerKey);
if (writer == null) {
Path pathForNextFile = getBasePathForNextFile(tuple);
writer = makeNewWriter(pathForNextFile, tuple);
writers.put(writerKey, writer);
return writer;
* A tuple must be mapped to a writer based on two factors.
* - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
* for an example of this)
* - the directory the tuple will be partioned into
private String getHashKeyForTuple(Tuple tuple) {
final String boltKey = getWriterKey(tuple);
final String partitionDir = this.partitioner.getPartitionPath(tuple);
return boltKey + "****" + partitionDir;
void doRotationAndRemoveWriter(String writerKey, Writer writer) {
try {
} catch (IOException e) {
LOG.error("File could not be rotated");
//At this point there is nothing to do. In all likelihood any filesystem operations will fail.
//The next tuple will almost certainly fail to write and/or sync, which force a rotation. That
//will give rotateAndReset() a chance to work which includes creating a fresh file handle.
} finally {
//rotateOutputFile(writer) has closed the writer. It's safe to remove the writer from the map here.
public Map<String, Object> getComponentConfiguration() {
return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval);
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
public void cleanup() {
if (this.rotationTimer != null) {
private void doRotationAndRemoveAllWriters() {
synchronized (writeLock) {
for (final Writer writer : writers.values()) {
try {
} catch (IOException e) {
LOG.warn("IOException during scheduled file rotation.", e);
//above for-loop has closed all the writers. It's safe to clear the map here.
private void syncAllWriters() throws IOException {
for (Writer writer : writers.values()) {
private void startTimedRotationPolicy() {
long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
this.rotationTimer = new Timer(true);
TimerTask task = new TimerTask() {
public void run() {
this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
protected Path getBasePathForNextFile(Tuple tuple) {
final String partitionPath = this.partitioner.getPartitionPath(tuple);
final int rotation;
if (rotationCounterMap.containsKey(partitionPath)) {
rotation = rotationCounterMap.get(partitionPath) + 1;
} else {
rotation = 0;
rotationCounterMap.put(partitionPath, rotation);
return new Path(this.fsUrl + this.fileNameFormat.getPath() + partitionPath,
this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
protected abstract void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws
protected abstract String getWriterKey(Tuple tuple);
protected abstract Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
static class WritersMap extends LinkedHashMap<String, Writer> {
final long maxWriters;
final OutputCollector collector;
public WritersMap(long maxWriters, OutputCollector collector) {
super((int) maxWriters, 0.75f, true);
this.maxWriters = maxWriters;
this.collector = collector;
protected boolean removeEldestEntry(Map.Entry<String, Writer> eldest) {
if (this.size() > this.maxWriters) {
//The writer must be closed before removed from the map.
//If it failed, we might lose some data.
try {
} catch (IOException e) {
LOG.error("Failed to close the eldest Writer");
return true;
} else {
return false;