blob: 6ab3ffeb8cd1472f4b2623de3de1c0ea8f1fcd06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.streaming;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Common methods for the implementing classes.
*/
abstract class AbstractStreamingTransaction
implements StreamingTransaction {
/**
* This variable should be initialized by the children.
*/
protected RecordWriter recordWriter;
/**
* This variable should be initialized by the children.
*/
protected List<TxnToWriteId> txnToWriteIds;
/**
* once any operation on this batch encounters a system exception
* (e.g. IOException on write) it's safest to assume that we can't write to the
* file backing this batch any more. This guards important public methods
*/
protected final AtomicBoolean isTxnClosed = new AtomicBoolean(false);
protected int currentTxnIndex = -1;
protected HiveStreamingConnection.TxnState state;
protected void checkIsClosed() throws StreamingException {
if (isTxnClosed.get()) {
throw new StreamingException("Transaction" + toString() + " is closed()");
}
}
protected void beginNextTransactionImpl(String errorMessage)
throws StreamingException{
state = HiveStreamingConnection.TxnState.INACTIVE; //clear state from previous txn
if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
throw new InvalidTransactionState(errorMessage);
}
currentTxnIndex++;
state = HiveStreamingConnection.TxnState.OPEN;
}
public void write(final byte[] record) throws StreamingException {
checkIsClosed();
boolean success = false;
try {
recordWriter.write(getCurrentWriteId(), record);
success = true;
} catch (SerializationError ex) {
//this exception indicates that a {@code record} could not be parsed and the
//caller can decide whether to drop it or send it to dead letter queue.
//rolling back the txn and retrying won't help since the tuple will be exactly the same
//when it's replayed.
success = true;
throw ex;
} finally {
markDead(success);
}
}
public void write(final InputStream inputStream) throws StreamingException {
checkIsClosed();
boolean success = false;
try {
recordWriter.write(getCurrentWriteId(), inputStream);
success = true;
} catch (SerializationError ex) {
//this exception indicates that a {@code record} could not be parsed and the
//caller can decide whether to drop it or send it to dead letter queue.
//rolling back the txn and retrying won'table help since the tuple will be exactly the same
//when it's replayed.
success = true;
throw ex;
} finally {
markDead(success);
}
}
/**
* A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue
* with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
* This ensures that a client can't ignore these failures and continue to write.
*/
protected void markDead(boolean success) throws StreamingException {
if (success) {
return;
}
close();
}
public long getCurrentWriteId() {
if (currentTxnIndex >= 0) {
return txnToWriteIds.get(currentTxnIndex).getWriteId();
}
return -1L;
}
public int remainingTransactions() {
if (currentTxnIndex >= 0) {
return txnToWriteIds.size() - currentTxnIndex - 1;
}
return txnToWriteIds.size();
}
public boolean isClosed() {
return isTxnClosed.get();
}
public HiveStreamingConnection.TxnState getCurrentTransactionState() {
return state;
}
public long getCurrentTxnId() {
if (currentTxnIndex >= 0) {
return txnToWriteIds.get(currentTxnIndex).getTxnId();
}
return -1L;
}
@Override
public List<TxnToWriteId> getTxnToWriteIds() {
return txnToWriteIds;
}
public void commit() throws StreamingException {
commit(null);
}
public void commit(Set<String> partitions) throws StreamingException {
commit(partitions, null, null);
}
}