blob: 01a446c9353c54c26f0b0610cda61bc3c5ac50bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.hdfs.bolt;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.rotation.MoveFileAction;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.yaml.snakeyaml.Yaml;
public class HdfsFileTopology {
static final String SENTENCE_SPOUT_ID = "sentence-spout";
static final String BOLT_ID = "my-bolt";
static final String TOPOLOGY_NAME = "test-topology";
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setNumWorkers(1);
SentenceSpout spout = new SentenceSpout();
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/tmp/foo/")
.withExtension(".txt");
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
Yaml yaml = new Yaml();
InputStream in = new FileInputStream(args[1]);
Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
in.close();
config.put("hdfs.config", yamlConf);
HdfsBolt bolt = new HdfsBolt()
.withConfigKey("hdfs.config")
.withFsUrl(args[0])
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
// SentenceSpout --> MyBolt
builder.setBolt(BOLT_ID, bolt, 4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
String topoName = TOPOLOGY_NAME;
if (args.length == 3) {
topoName = args[2];
} else if (args.length > 3) {
System.out.println("Usage: HdfsFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
return;
}
StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
public static void waitForSeconds(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
//ignore
}
}
public static class SentenceSpout extends BaseRichSpout {
private ConcurrentHashMap<UUID, Values> pending;
private SpoutOutputCollector collector;
private String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"
};
private int index = 0;
private int count = 0;
private long total = 0L;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence", "timestamp"));
}
@Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}
@Override
public void nextTuple() {
Values values = new Values(sentences[index], System.currentTimeMillis());
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId);
index++;
if (index >= sentences.length) {
index = 0;
}
count++;
total++;
if (count > 20000) {
count = 0;
System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
}
Thread.yield();
}
@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
}
}
public static class MyBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
@Override
public void cleanup() {
}
}
}