| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.hive.bolt; |
| |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.storm.Config; |
| import org.apache.storm.StormSubmitter; |
| import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; |
| import org.apache.storm.hive.common.HiveOptions; |
| import org.apache.storm.spout.SpoutOutputCollector; |
| import org.apache.storm.task.TopologyContext; |
| import org.apache.storm.topology.OutputFieldsDeclarer; |
| import org.apache.storm.topology.TopologyBuilder; |
| import org.apache.storm.topology.base.BaseRichSpout; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.tuple.Values; |
| |
| |
| public class HiveTopology { |
| static final String USER_SPOUT_ID = "user-spout"; |
| static final String BOLT_ID = "my-hive-bolt"; |
| static final String TOPOLOGY_NAME = "hive-test-topology1"; |
| |
| public static void main(String[] args) throws Exception { |
| String metaStoreUri = args[0]; |
| String dbName = args[1]; |
| String tblName = args[2]; |
| String[] colNames = {"id","name","phone","street","city","state"}; |
| Config config = new Config(); |
| config.setNumWorkers(1); |
| UserDataSpout spout = new UserDataSpout(); |
| DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() |
| .withColumnFields(new Fields(colNames)); |
| HiveOptions hiveOptions; |
| if (args.length == 6) { |
| hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) |
| .withTxnsPerBatch(10) |
| .withBatchSize(100) |
| .withIdleTimeout(10) |
| .withKerberosKeytab(args[4]) |
| .withKerberosPrincipal(args[5]); |
| } else { |
| hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) |
| .withTxnsPerBatch(10) |
| .withBatchSize(100) |
| .withIdleTimeout(10) |
| .withMaxOpenConnections(1); |
| } |
| |
| HiveBolt hiveBolt = new HiveBolt(hiveOptions); |
| TopologyBuilder builder = new TopologyBuilder(); |
| builder.setSpout(USER_SPOUT_ID, spout, 1); |
| // SentenceSpout --> MyBolt |
| builder.setBolt(BOLT_ID, hiveBolt, 1) |
| .shuffleGrouping(USER_SPOUT_ID); |
| |
| String topoName = TOPOLOGY_NAME; |
| if (args.length >= 4) { |
| topoName = args[3]; |
| } |
| StormSubmitter.submitTopology(topoName, config, builder.createTopology()); |
| } |
| |
| public static void waitForSeconds(int seconds) { |
| try { |
| Thread.sleep(seconds * 1000); |
| } catch (InterruptedException e) { |
| //ignore |
| } |
| } |
| |
| public static class UserDataSpout extends BaseRichSpout { |
| private ConcurrentHashMap<UUID, Values> pending; |
| private SpoutOutputCollector collector; |
| private String[] sentences = { |
| "1,user1,123456,street1,sunnyvale,ca", |
| "2,user2,123456,street2,sunnyvale,ca", |
| "3,user3,123456,street3,san jose,ca", |
| "4,user4,123456,street4,san jose,ca", |
| }; |
| private int index = 0; |
| private int count = 0; |
| private long total = 0L; |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields("id","name","phone","street","city","state")); |
| } |
| |
| @Override |
| public void open(Map<String, Object> config, TopologyContext context, |
| SpoutOutputCollector collector) { |
| this.collector = collector; |
| this.pending = new ConcurrentHashMap<UUID, Values>(); |
| } |
| |
| @Override |
| public void nextTuple() { |
| String[] user = sentences[index].split(","); |
| Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]); |
| UUID msgId = UUID.randomUUID(); |
| this.pending.put(msgId, values); |
| this.collector.emit(values, msgId); |
| index++; |
| if (index >= sentences.length) { |
| index = 0; |
| } |
| count++; |
| total++; |
| if (count > 1000) { |
| count = 0; |
| System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); |
| } |
| Thread.yield(); |
| } |
| |
| @Override |
| public void ack(Object msgId) { |
| this.pending.remove(msgId); |
| } |
| |
| @Override |
| public void fail(Object msgId) { |
| System.out.println("**** RESENDING FAILED TUPLE"); |
| this.collector.emit(this.pending.get(msgId), msgId); |
| } |
| } |
| } |