blob: d0461926d4d5873ba01853020f03b7c6e94460cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hive.common;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.*;
import org.apache.storm.hive.bolt.mapper.HiveMapper;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveWriter {
private static final Logger LOG = LoggerFactory
.getLogger(HiveWriter.class);
private final HiveEndPoint endPoint;
private final StreamingConnection connection;
private final int txnsPerBatch;
private final RecordWriter recordWriter;
private TransactionBatch txnBatch;
private final ExecutorService callTimeoutPool;
private final long callTimeout;
private long lastUsed; // time of last flush on this writer
protected boolean closed; // flag indicating HiveWriter was closed
private boolean autoCreatePartitions;
private boolean heartBeatNeeded = false;
private UserGroupInformation ugi;
public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
boolean autoCreatePartitions, long callTimeout,
ExecutorService callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi)
throws InterruptedException, ConnectFailure {
try {
this.autoCreatePartitions = autoCreatePartitions;
this.callTimeout = callTimeout;
this.callTimeoutPool = callTimeoutPool;
this.endPoint = endPoint;
this.ugi = ugi;
this.connection = newConnection(ugi);
this.txnsPerBatch = txnsPerBatch;
this.recordWriter = mapper.createRecordWriter(endPoint);
this.txnBatch = nextTxnBatch(recordWriter);
this.closed = false;
this.lastUsed = System.currentTimeMillis();
} catch(InterruptedException e) {
throw e;
} catch(RuntimeException e) {
throw e;
} catch(Exception e) {
throw new ConnectFailure(endPoint, e);
}
}
@Override
public String toString() {
return endPoint.toString();
}
public void setHeartBeatNeeded() {
heartBeatNeeded = true;
}
/**
* Write data
*
* @throws org.apache.storm.hive.common.HiveWriter.WriteFailure
* @throws InterruptedException
*/
public synchronized void write(final byte[] record)
throws WriteFailure, InterruptedException {
if (closed) {
throw new IllegalStateException("This hive streaming writer was closed " +
"and thus no longer able to write : " + endPoint);
}
// write the tuple
try {
LOG.debug("Writing event to {}", endPoint);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws StreamingException, InterruptedException {
txnBatch.write(record);
return null;
}
});
} catch(StreamingException e) {
throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
} catch(TimeoutException e) {
throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
}
}
/**
* Commits the current Txn.
* If 'rollToNext' is true, will switch to next Txn in batch or to a
* new TxnBatch if current Txn batch is exhausted
* TODO: see what to do when there are errors in each IO call stage
*/
public void flush(boolean rollToNext)
throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
if(heartBeatNeeded) {
heartBeatNeeded = false;
heartBeat();
}
lastUsed = System.currentTimeMillis();
try {
commitTxn();
if(txnBatch.remainingTransactions() == 0) {
closeTxnBatch();
txnBatch = null;
if(rollToNext) {
txnBatch = nextTxnBatch(recordWriter);
}
}
if(rollToNext) {
LOG.debug("Switching to next Txn for {}", endPoint);
txnBatch.beginNextTransaction(); // does not block
}
} catch(StreamingException e) {
throw new TxnFailure(txnBatch, e);
}
}
/** Queues up a heartbeat request on the current and remaining txns using the
* heartbeatThdPool and returns immediately
*/
public void heartBeat() throws InterruptedException {
// 1) schedule the heartbeat on one thread in pool
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
try {
LOG.debug("Sending heartbeat on batch " + txnBatch);
txnBatch.heartbeat();
} catch (StreamingException e) {
LOG.warn("Heartbeat error on batch " + txnBatch, e);
}
return null;
}
});
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
// Suppressing exceptions as we don't care for errors on heartbeats
}
}
/**
* Close the Transaction Batch and connection
* @throws IOException
* @throws InterruptedException
*/
public void close() throws IOException, InterruptedException {
closeTxnBatch();
closeConnection();
closed = true;
}
private void closeConnection() throws InterruptedException {
LOG.info("Closing connection to end point : {}", endPoint);
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
connection.close(); // could block
return null;
}
});
} catch(Exception e) {
LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
// Suppressing exceptions as we don't care for errors on connection close
}
}
private void commitTxn() throws CommitFailure, InterruptedException {
LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId() , endPoint);
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
txnBatch.commit(); // could block
return null;
}
});
} catch (StreamingException e) {
throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
} catch (TimeoutException e) {
throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
}
}
private StreamingConnection newConnection(final UserGroupInformation ugi)
throws InterruptedException, ConnectFailure {
try {
return callWithTimeout(new CallRunner<StreamingConnection>() {
@Override
public StreamingConnection call() throws Exception {
return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
}
});
} catch(StreamingException e) {
throw new ConnectFailure(endPoint, e);
} catch(TimeoutException e) {
throw new ConnectFailure(endPoint, e);
}
}
private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
throws InterruptedException, TxnBatchFailure {
LOG.debug("Fetching new Txn Batch for {}", endPoint);
TransactionBatch batch = null;
try {
batch = callWithTimeout(new CallRunner<TransactionBatch>() {
@Override
public TransactionBatch call() throws Exception {
return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
}
});
LOG.debug("Acquired {}. Switching to first txn", batch);
batch.beginNextTransaction();
} catch(TimeoutException e) {
throw new TxnBatchFailure(endPoint, e);
} catch(StreamingException e) {
throw new TxnBatchFailure(endPoint, e);
}
return batch;
}
private void closeTxnBatch() throws InterruptedException {
try {
LOG.debug("Closing Txn Batch {}", txnBatch);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
if(txnBatch != null) {
txnBatch.close(); // could block
}
return null;
}
});
} catch(InterruptedException e) {
throw e;
} catch(Exception e) {
LOG.warn("Error closing txn batch "+ txnBatch, e);
}
}
/**
* Aborts the current Txn and switches to next Txn.
* @throws InterruptedException if could not get new Transaction Batch, or switch to next Txn
*/
public void abort() throws InterruptedException {
abortTxn();
}
private void abortTxn() throws InterruptedException {
LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
try {
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws StreamingException, InterruptedException {
txnBatch.abort(); // could block
return null;
}
});
} catch (InterruptedException e) {
throw e;
} catch (TimeoutException e) {
LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
} catch (Exception e) {
LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
// Suppressing exceptions as we don't care for errors on abort
}
}
/**
* If the current thread has been interrupted, then throws an
* exception.
* @throws InterruptedException
*/
private static void checkAndThrowInterruptedException()
throws InterruptedException {
if (Thread.currentThread().interrupted()) {
throw new InterruptedException("Timed out before Hive call was made. "
+ "Your callTimeout might be set too low or Hive calls are "
+ "taking too long.");
}
}
/**
* Execute the callable on a separate thread and wait for the completion
* for the specified amount of time in milliseconds. In case of timeout
* cancel the callable and throw an IOException
*/
private <T> T callWithTimeout(final CallRunner<T> callRunner)
throws TimeoutException, StreamingException, InterruptedException {
Future<T> future = callTimeoutPool.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return callRunner.call();
}
});
try {
if (callTimeout > 0) {
return future.get(callTimeout, TimeUnit.MILLISECONDS);
} else {
return future.get();
}
} catch (TimeoutException eT) {
future.cancel(true);
throw eT;
} catch (ExecutionException e1) {
Throwable cause = e1.getCause();
if (cause instanceof IOException) {
throw new StreamingIOFailure("I/O Failure", (IOException) cause);
} else if (cause instanceof StreamingException) {
throw (StreamingException) cause;
} else if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof TimeoutException) {
throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
} else {
throw new RuntimeException(e1);
}
}
}
public long getLastUsed() {
return lastUsed;
}
private byte[] generateRecord(Tuple tuple) {
StringBuilder buf = new StringBuilder();
for (Object o: tuple.getValues()) {
buf.append(o);
buf.append(",");
}
return buf.toString().getBytes();
}
/**
* Simple interface whose <tt>call</tt> method is called by
* {#callWithTimeout} in a new thread inside a
* {@linkplain java.security.PrivilegedExceptionAction#run()} call.
* @param <T>
*/
private interface CallRunner<T> {
T call() throws Exception;
}
public static class Failure extends Exception {
public Failure(String message, Throwable cause) {
super(message, cause);
}
}
public static class WriteFailure extends Failure {
public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
}
}
public static class CommitFailure extends Failure {
public CommitFailure(HiveEndPoint endPoint, Long txnID, Throwable cause) {
super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
}
}
public static class ConnectFailure extends Failure {
public ConnectFailure(HiveEndPoint ep, Throwable cause) {
super("Failed connecting to EndPoint " + ep, cause);
}
}
public static class TxnBatchFailure extends Failure {
public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
}
}
public static class TxnFailure extends Failure {
public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
}
}
}