| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.hdfs.bolt; |
| |
| import java.io.FileInputStream; |
| import java.io.InputStream; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.storm.Config; |
| import org.apache.storm.StormSubmitter; |
| import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; |
| import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat; |
| import org.apache.storm.hdfs.bolt.format.FileNameFormat; |
| import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; |
| import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; |
| import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units; |
| import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; |
| import org.apache.storm.hdfs.bolt.sync.SyncPolicy; |
| import org.apache.storm.hdfs.common.rotation.MoveFileAction; |
| import org.apache.storm.spout.SpoutOutputCollector; |
| import org.apache.storm.task.OutputCollector; |
| import org.apache.storm.task.TopologyContext; |
| import org.apache.storm.topology.OutputFieldsDeclarer; |
| import org.apache.storm.topology.TopologyBuilder; |
| import org.apache.storm.topology.base.BaseRichBolt; |
| import org.apache.storm.topology.base.BaseRichSpout; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.tuple.Tuple; |
| import org.apache.storm.tuple.Values; |
| import org.yaml.snakeyaml.Yaml; |
| |
| public class SequenceFileTopology { |
| static final String SENTENCE_SPOUT_ID = "sentence-spout"; |
| static final String BOLT_ID = "my-bolt"; |
| static final String TOPOLOGY_NAME = "test-topology"; |
| |
| public static void main(String[] args) throws Exception { |
| Config config = new Config(); |
| config.setNumWorkers(1); |
| |
| SentenceSpout spout = new SentenceSpout(); |
| |
| // sync the filesystem after every 1k tuples |
| SyncPolicy syncPolicy = new CountSyncPolicy(1000); |
| |
| // rotate files when they reach 5MB |
| FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); |
| |
| FileNameFormat fileNameFormat = new DefaultFileNameFormat() |
| .withPath("/tmp/source/") |
| .withExtension(".seq"); |
| |
| // create sequence format instance. |
| DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence"); |
| |
| Yaml yaml = new Yaml(); |
| InputStream in = new FileInputStream(args[1]); |
| Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in); |
| in.close(); |
| config.put("hdfs.config", yamlConf); |
| |
| SequenceFileBolt bolt = new SequenceFileBolt() |
| .withFsUrl(args[0]) |
| .withConfigKey("hdfs.config") |
| .withFileNameFormat(fileNameFormat) |
| .withSequenceFormat(format) |
| .withRotationPolicy(rotationPolicy) |
| .withSyncPolicy(syncPolicy) |
| .withCompressionType(SequenceFile.CompressionType.RECORD) |
| .withCompressionCodec("deflate") |
| .addRotationAction(new MoveFileAction().toDestination("/tmp/dest/")); |
| |
| TopologyBuilder builder = new TopologyBuilder(); |
| |
| builder.setSpout(SENTENCE_SPOUT_ID, spout, 1); |
| // SentenceSpout --> MyBolt |
| builder.setBolt(BOLT_ID, bolt, 4) |
| .shuffleGrouping(SENTENCE_SPOUT_ID); |
| |
| String topoName = TOPOLOGY_NAME; |
| if (args.length == 3) { |
| topoName = args[2]; |
| } else if (args.length > 3) { |
| System.out.println("Usage: SequenceFileTopology [hdfs url] [hdfs yaml config file] <topology name>"); |
| return; |
| } |
| StormSubmitter.submitTopology(topoName, config, builder.createTopology()); |
| } |
| |
| public static void waitForSeconds(int seconds) { |
| try { |
| Thread.sleep(seconds * 1000); |
| } catch (InterruptedException e) { |
| //ignore |
| } |
| } |
| |
| |
| public static class SentenceSpout extends BaseRichSpout { |
| |
| |
| private ConcurrentHashMap<UUID, Values> pending; |
| private SpoutOutputCollector collector; |
| private String[] sentences = { |
| "my dog has fleas", |
| "i like cold beverages", |
| "the dog ate my homework", |
| "don't have a cow man", |
| "i don't think i like fleas" |
| }; |
| private int index = 0; |
| private int count = 0; |
| private long total = 0L; |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields("sentence", "timestamp")); |
| } |
| |
| @Override |
| public void open(Map<String, Object> config, TopologyContext context, |
| SpoutOutputCollector collector) { |
| this.collector = collector; |
| this.pending = new ConcurrentHashMap<UUID, Values>(); |
| } |
| |
| @Override |
| public void nextTuple() { |
| Values values = new Values(sentences[index], System.currentTimeMillis()); |
| UUID msgId = UUID.randomUUID(); |
| this.pending.put(msgId, values); |
| this.collector.emit(values, msgId); |
| index++; |
| if (index >= sentences.length) { |
| index = 0; |
| } |
| count++; |
| total++; |
| if (count > 20000) { |
| count = 0; |
| System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); |
| } |
| Thread.yield(); |
| } |
| |
| @Override |
| public void ack(Object msgId) { |
| // System.out.println("ACK"); |
| this.pending.remove(msgId); |
| } |
| |
| @Override |
| public void fail(Object msgId) { |
| System.out.println("**** RESENDING FAILED TUPLE"); |
| this.collector.emit(this.pending.get(msgId), msgId); |
| } |
| } |
| |
| |
| public static class MyBolt extends BaseRichBolt { |
| |
| private HashMap<String, Long> counts = null; |
| private OutputCollector collector; |
| |
| @Override |
| public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) { |
| this.counts = new HashMap<String, Long>(); |
| this.collector = collector; |
| } |
| |
| @Override |
| public void execute(Tuple tuple) { |
| collector.ack(tuple); |
| } |
| |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| // this bolt does not emit anything |
| } |
| |
| @Override |
| public void cleanup() { |
| } |
| } |
| } |