| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.hive.bolt; |
| |
| import backtype.storm.task.OutputCollector; |
| import backtype.storm.task.TopologyContext; |
| import backtype.storm.tuple.Tuple; |
| import backtype.storm.topology.base.BaseRichBolt; |
| import backtype.storm.topology.OutputFieldsDeclarer; |
| import backtype.storm.utils.TupleUtils; |
| import backtype.storm.Config; |
| import org.apache.storm.hive.common.HiveWriter; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.apache.hive.hcatalog.streaming.*; |
| import org.apache.storm.hive.common.HiveOptions; |
| import org.apache.storm.hive.common.HiveUtils; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.HashMap; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.List; |
| import java.util.LinkedList; |
| import java.io.IOException; |
| |
| public class HiveBolt extends BaseRichBolt { |
| private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class); |
| private OutputCollector collector; |
| private HiveOptions options; |
| private ExecutorService callTimeoutPool; |
| private transient Timer heartBeatTimer; |
| private Boolean kerberosEnabled = false; |
| private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false); |
| private UserGroupInformation ugi = null; |
| HashMap<HiveEndPoint, HiveWriter> allWriters; |
| private List<Tuple> tupleBatch; |
| |
| public HiveBolt(HiveOptions options) { |
| this.options = options; |
| tupleBatch = new LinkedList<>(); |
| } |
| |
| @Override |
| public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) { |
| try { |
| if(options.getKerberosPrincipal() == null && options.getKerberosKeytab() == null) { |
| kerberosEnabled = false; |
| } else if(options.getKerberosPrincipal() != null && options.getKerberosKeytab() != null) { |
| kerberosEnabled = true; |
| } else { |
| throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal " + |
| " & KerberosKeytab"); |
| } |
| |
| if (kerberosEnabled) { |
| try { |
| ugi = HiveUtils.authenticate(options.getKerberosKeytab(), options.getKerberosPrincipal()); |
| } catch(HiveUtils.AuthenticationFailed ex) { |
| LOG.error("Hive Kerberos authentication failed " + ex.getMessage(), ex); |
| throw new IllegalArgumentException(ex); |
| } |
| } |
| this.collector = collector; |
| allWriters = new HashMap<HiveEndPoint,HiveWriter>(); |
| String timeoutName = "hive-bolt-%d"; |
| this.callTimeoutPool = Executors.newFixedThreadPool(1, |
| new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); |
| heartBeatTimer = new Timer(); |
| setupHeartBeatTimer(); |
| } catch(Exception e) { |
| LOG.warn("unable to make connection to hive ", e); |
| } |
| } |
| |
| @Override |
| public void execute(Tuple tuple) { |
| try { |
| boolean forceFlush = false; |
| if (TupleUtils.isTick(tuple)) { |
| LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + options.getBatchSize() + "]"); |
| forceFlush = true; |
| } |
| else { |
| List<String> partitionVals = options.getMapper().mapPartitions(tuple); |
| HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); |
| HiveWriter writer = getOrCreateWriter(endPoint); |
| if (timeToSendHeartBeat.compareAndSet(true, false)) { |
| enableHeartBeatOnAllWriters(); |
| } |
| writer.write(options.getMapper().mapRecord(tuple)); |
| tupleBatch.add(tuple); |
| if (tupleBatch.size() >= options.getBatchSize()) |
| forceFlush = true; |
| } |
| if(forceFlush && !tupleBatch.isEmpty()) { |
| flushAllWriters(true); |
| LOG.info("acknowledging tuples after writers flushed "); |
| for(Tuple t : tupleBatch) |
| collector.ack(t); |
| tupleBatch.clear(); |
| } |
| } catch(Exception e) { |
| this.collector.reportError(e); |
| collector.fail(tuple); |
| try { |
| flushAndCloseWriters(); |
| LOG.info("acknowledging tuples after writers flushed and closed"); |
| for (Tuple t : tupleBatch) |
| collector.ack(t); |
| tupleBatch.clear(); |
| } catch (Exception e1) { |
| //If flushAndClose fails assume tuples are lost, do not ack |
| LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged"); |
| for (Tuple t : tupleBatch) |
| collector.fail(t); |
| tupleBatch.clear(); |
| } |
| } |
| } |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| |
| } |
| |
| @Override |
| public void cleanup() { |
| for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { |
| try { |
| HiveWriter w = entry.getValue(); |
| LOG.info("Flushing writer to {}", w); |
| w.flush(false); |
| LOG.info("Closing writer to {}", w); |
| w.close(); |
| } catch (Exception ex) { |
| LOG.warn("Error while closing writer to " + entry.getKey() + |
| ". Exception follows.", ex); |
| if (ex instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| ExecutorService toShutdown[] = {callTimeoutPool}; |
| for (ExecutorService execService : toShutdown) { |
| execService.shutdown(); |
| try { |
| while (!execService.isTerminated()) { |
| execService.awaitTermination( |
| options.getCallTimeOut(), TimeUnit.MILLISECONDS); |
| } |
| } catch (InterruptedException ex) { |
| LOG.warn("shutdown interrupted on " + execService, ex); |
| } |
| } |
| callTimeoutPool = null; |
| super.cleanup(); |
| LOG.info("Hive Bolt stopped"); |
| } |
| |
| @Override |
| public Map<String, Object> getComponentConfiguration() { |
| Map<String, Object> conf = super.getComponentConfiguration(); |
| if (conf == null) |
| conf = new Config(); |
| |
| if (options.getTickTupleInterval() > 0) |
| conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval()); |
| |
| return conf; |
| } |
| |
| private void setupHeartBeatTimer() { |
| if(options.getHeartBeatInterval()>0) { |
| heartBeatTimer.schedule(new TimerTask() { |
| @Override |
| public void run() { |
| timeToSendHeartBeat.set(true); |
| setupHeartBeatTimer(); |
| } |
| }, options.getHeartBeatInterval() * 1000); |
| } |
| } |
| |
| void flushAllWriters(boolean rollToNext) |
| throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException { |
| for(HiveWriter writer: allWriters.values()) { |
| writer.flush(rollToNext); |
| } |
| } |
| |
| /** |
| * Closes all writers and remove them from cache |
| * @return number of writers retired |
| */ |
| private void closeAllWriters() { |
| try { |
| //1) Retire writers |
| for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { |
| entry.getValue().close(); |
| } |
| //2) Clear cache |
| allWriters.clear(); |
| } catch(Exception e) { |
| LOG.warn("unable to close writers. ", e); |
| } |
| } |
| |
| void flushAndCloseWriters() throws Exception { |
| try { |
| flushAllWriters(false); |
| } catch(Exception e) { |
| LOG.warn("unable to flush hive writers. ", e); |
| throw e; |
| } finally { |
| closeAllWriters(); |
| } |
| } |
| |
| private void enableHeartBeatOnAllWriters() { |
| for (HiveWriter writer : allWriters.values()) { |
| writer.setHeartBeatNeeded(); |
| } |
| } |
| |
| private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) |
| throws HiveWriter.ConnectFailure, InterruptedException { |
| try { |
| HiveWriter writer = allWriters.get( endPoint ); |
| if( writer == null ) { |
| LOG.debug("Creating Writer to Hive end point : " + endPoint); |
| writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options); |
| if(allWriters.size() > options.getMaxOpenConnections()){ |
| int retired = retireIdleWriters(); |
| if(retired==0) { |
| retireEldestWriter(); |
| } |
| } |
| allWriters.put(endPoint, writer); |
| } |
| return writer; |
| } catch (HiveWriter.ConnectFailure e) { |
| LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e); |
| throw e; |
| } |
| } |
| |
| /** |
| * Locate writer that has not been used for longest time and retire it |
| */ |
| private void retireEldestWriter() { |
| long oldestTimeStamp = System.currentTimeMillis(); |
| HiveEndPoint eldest = null; |
| for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { |
| if(entry.getValue().getLastUsed() < oldestTimeStamp) { |
| eldest = entry.getKey(); |
| oldestTimeStamp = entry.getValue().getLastUsed(); |
| } |
| } |
| try { |
| LOG.info("Closing least used Writer to Hive end point : " + eldest); |
| allWriters.remove(eldest).close(); |
| } catch (IOException e) { |
| LOG.warn("Failed to close writer for end point: " + eldest, e); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * Locate all writers past idle timeout and retire them |
| * @return number of writers retired |
| */ |
| private int retireIdleWriters() { |
| int count = 0; |
| long now = System.currentTimeMillis(); |
| ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>(); |
| |
| //1) Find retirement candidates |
| for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { |
| if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { |
| ++count; |
| retirees.add(entry.getKey()); |
| } |
| } |
| //2) Retire them |
| for(HiveEndPoint ep : retirees) { |
| try { |
| LOG.info("Closing idle Writer to Hive end point : {}", ep); |
| allWriters.remove(ep).close(); |
| } catch (IOException e) { |
| LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| return count; |
| } |
| |
| } |