blob: 294cac8097156f4b9c6e9ac862fc8255c8fbd444 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.kafka;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Transactional Kafka Record Writer used to achieve Exactly once semantic.
*/
class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<BytesWritable, KafkaWritable> {
private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class);
private static final String TRANSACTION_DIR = "transaction_states";
private static final Duration DURATION_0 = Duration.ofMillis(0);
private final String topic;
private final HiveKafkaProducer<byte[], byte[]> producer;
private final Callback callback;
private final AtomicReference<Exception> sendExceptionRef = new AtomicReference<>();
private final Path openTxFileName;
private final boolean optimisticCommit;
private final FileSystem fileSystem;
private final Map<TopicPartition, Long> offsets = new HashMap<>();
private final String writerIdTopicId;
private final long producerId;
private final short producerEpoch;
private long sentRecords = 0L;
/**
* @param topic Kafka topic.
* @param producerProperties kafka producer properties.
* @param queryWorkingPath the Query working directory as, table_directory/hive_query_id.
* Used to store the state of the transaction and/or log sent records and partitions.
* for more information see:
* {@link KafkaStorageHandler#getQueryWorkingDir(org.apache.hadoop.hive.metastore.api.Table)}
* @param fileSystem file system handler.
* @param optimisticCommit if true the commit will happen at the task level otherwise will be delegated to HS2.
*/
TransactionalKafkaWriter(String topic, Properties producerProperties,
Path queryWorkingPath,
FileSystem fileSystem,
@Nullable Boolean optimisticCommit) {
this.fileSystem = fileSystem;
this.topic = Preconditions.checkNotNull(topic, "NULL topic !!");
Preconditions.checkState(producerProperties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
"set [" + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + "] property");
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.producer = new HiveKafkaProducer<>(producerProperties);
this.optimisticCommit = optimisticCommit == null ? true : optimisticCommit;
this.callback = (metadata, exception) -> {
if (exception != null) {
sendExceptionRef.compareAndSet(null, exception);
} else {
//According to https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
//Callbacks form the same TopicPartition will return in order thus this will keep track of most recent offset.
final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
}
};
// Start Tx
assert producer.getTransactionalId() != null;
try {
producer.initTransactions();
producer.beginTransaction();
} catch (Exception exception) {
logHints(exception);
if (tryToAbortTx(exception)) {
LOG.error("Aborting Transaction [{}] cause by ERROR [{}]",
producer.getTransactionalId(),
exception.getMessage());
producer.abortTransaction();
}
LOG.error("Closing writer [{}] caused by ERROR [{}]", producer.getTransactionalId(), exception.getMessage());
producer.close(DURATION_0);
throw exception;
}
writerIdTopicId = String.format("WriterId [%s], Kafka Topic [%s]", producer.getTransactionalId(), topic);
producerEpoch = this.optimisticCommit ? -1 : producer.getEpoch();
producerId = this.optimisticCommit ? -1 : producer.getProducerId();
LOG.info("DONE with Initialization of {}, Epoch[{}], internal ID[{}]", writerIdTopicId, producerEpoch, producerId);
//Writer base working directory
openTxFileName =
this.optimisticCommit ?
null :
new Path(new Path(new Path(queryWorkingPath, TRANSACTION_DIR), producer.getTransactionalId()),
String.valueOf(producerEpoch));
}
@Override public void write(Writable w) throws IOException {
checkExceptions();
try {
sentRecords++;
producer.send(KafkaUtils.toProducerRecord(topic, (KafkaWritable) w), callback);
} catch (Exception e) {
if (tryToAbortTx(e)) {
// producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
producer.abortTransaction();
}
producer.close(DURATION_0);
sendExceptionRef.compareAndSet(null, e);
checkExceptions();
}
}
private void logHints(Exception e) {
if (e instanceof TimeoutException) {
LOG.error("Maybe Try to increase [`retry.backoff.ms`] to avoid this error [{}].", e.getMessage());
}
}
/**
* The non Abort Close method can be split into 2 parts.
* Part one is to Flush to Kafka all the buffered Records then Log (Topic-Partition, Offset).
* Part two is To either commit the TX or Save the state of the TX to WAL and let HS2 do the commit.
*
* @param abort if set to true will abort flush and exit
* @throws IOException exception causing the failure
*/
@Override public void close(boolean abort) throws IOException {
if (abort) {
// Case Abort, try to AbortTransaction -> Close producer ASAP -> Exit;
LOG.warn("Aborting Transaction and Sending from {}", writerIdTopicId);
try {
producer.abortTransaction();
} catch (Exception e) {
LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage());
}
producer.close(DURATION_0);
return;
}
// Normal Case -> lOG and Commit then Close
LOG.info("Flushing Kafka buffer of writerId {}", writerIdTopicId);
producer.flush();
// No exception good let's log to a file whatever Flushed.
String formattedMsg = "Topic[%s] Partition [%s] -> Last offset [%s]";
String
flushedOffsetMsg =
offsets.entrySet()
.stream()
.map(topicPartitionLongEntry -> String.format(formattedMsg,
topicPartitionLongEntry.getKey().topic(),
topicPartitionLongEntry.getKey().partition(),
topicPartitionLongEntry.getValue()))
.collect(Collectors.joining(","));
LOG.info("WriterId {} flushed the following [{}] ", writerIdTopicId, flushedOffsetMsg);
// OPTIMISTIC COMMIT OR PERSIST STATE OF THE TX_WAL
checkExceptions();
if (optimisticCommit) {
// Case Commit at the task level
commitTransaction();
} else {
// Case delegate TX commit to HS2
persistTxState();
}
checkExceptions();
LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]",
producer.getTransactionalId(),
sentRecords,
topic);
producer.close(Duration.ZERO);
}
private void commitTransaction() {
LOG.info("Attempting Optimistic commit by {}", writerIdTopicId);
try {
producer.commitTransaction();
} catch (Exception e) {
sendExceptionRef.compareAndSet(null, e);
}
}
/**
* Write the Kafka Consumer PID and Epoch to checkpoint file {@link TransactionalKafkaWriter#openTxFileName}.
*/
private void persistTxState() {
LOG.info("Committing state to path [{}] by [{}]", openTxFileName.toString(), writerIdTopicId);
try (FSDataOutputStream outStream = fileSystem.create(openTxFileName)) {
outStream.writeLong(producerId);
outStream.writeShort(producerEpoch);
} catch (Exception e) {
sendExceptionRef.compareAndSet(null, e);
}
}
@Override public void write(BytesWritable bytesWritable, KafkaWritable kafkaWritable) throws IOException {
write(kafkaWritable);
}
@Override public void close(Reporter reporter) throws IOException {
close(false);
}
@VisibleForTesting long getSentRecords() {
return sentRecords;
}
@VisibleForTesting short getProducerEpoch() {
return producerEpoch;
}
@VisibleForTesting long getProducerId() {
return producerId;
}
/**
* Checks for existing exception. In case of exception will close consumer and rethrow as IOException
* @throws IOException abort if possible, close consumer then rethrow exception.
*/
private void checkExceptions() throws IOException {
if (sendExceptionRef.get() != null && sendExceptionRef.get() instanceof KafkaException && sendExceptionRef.get()
.getCause() instanceof ProducerFencedException) {
// producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
sendExceptionRef.updateAndGet(e -> (KafkaException) e.getCause());
}
if (sendExceptionRef.get() != null) {
final Exception exception = sendExceptionRef.get();
logHints(exception);
if (tryToAbortTx(exception)) {
LOG.error("Aborting Transaction [{}] cause by ERROR [{}]", writerIdTopicId, exception.getMessage());
producer.abortTransaction();
}
LOG.error("Closing writer [{}] caused by ERROR [{}]", writerIdTopicId, exception.getMessage());
producer.close(DURATION_0);
throw new IOException(exception);
}
}
private boolean tryToAbortTx(Throwable e) {
// According to https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
// We can't recover from these exceptions, so our only option is to close the producer and exit.
boolean
isNotFencedOut =
!(e instanceof ProducerFencedException)
&& !(e instanceof OutOfOrderSequenceException)
&& !(e instanceof AuthenticationException);
// producer.send() may throw a KafkaException which wraps a FencedException therefore check inner cause.
boolean causeIsNotFencedOut = !(e.getCause() != null && e.getCause() instanceof ProducerFencedException);
return isNotFencedOut && causeIsNotFencedOut;
}
/**
* Given a query workingDirectory as table_directory/hive_query_id/ will fetch the open transaction states.
* Table directory is {@link org.apache.hadoop.hive.metastore.api.Table#getSd()#getLocation()}.
* Hive Query ID is inferred from the JobConf see {@link KafkaStorageHandler#getQueryId()}.
*
* The path to a transaction state is as follow.
* .../{@code queryWorkingDir}/{@code TRANSACTION_DIR}/{@code writerId}/{@code producerEpoch}
*
* The actual state is stored in the file {@code producerEpoch}.
* The file contains a {@link Long} as internal producer Id and a {@link Short} as the producer epoch.
* According to Kafka API, highest epoch corresponds to the active Producer, therefore if there is multiple
* {@code producerEpoch} files will pick the maximum based on {@link Short::compareTo}.
*
* @param fs File system handler.
* @param queryWorkingDir Query working Directory, see:
* {@link KafkaStorageHandler#getQueryWorkingDir(org.apache.hadoop.hive.metastore.api.Table)}.
* @return Map of Transaction Ids to Pair of Kafka Producer internal ID (Long) and producer epoch (short)
* @throws IOException if any of the IO operations fail.
*/
static Map<String, Pair<Long, Short>> getTransactionsState(FileSystem fs, Path queryWorkingDir) throws IOException {
//list all current Dir
final Path transactionWorkingDir = new Path(queryWorkingDir, TRANSACTION_DIR);
final FileStatus[] files = fs.listStatus(transactionWorkingDir);
final Set<FileStatus>
transactionSet =
Arrays.stream(files).filter(FileStatus::isDirectory).collect(Collectors.toSet());
Set<Path> setOfTxPath = transactionSet.stream().map(FileStatus::getPath).collect(Collectors.toSet());
ImmutableMap.Builder<String, Pair<Long, Short>> builder = ImmutableMap.builder();
setOfTxPath.forEach(path -> {
final String txId = path.getName();
try {
FileStatus[] epochFiles = fs.listStatus(path);
// List all the Epoch if any and select the max.
// According to Kafka API recent venison of Producer with the same TxID will have greater epoch and same PID.
Optional<Short>
maxEpoch =
Arrays.stream(epochFiles)
.filter(FileStatus::isFile)
.map(fileStatus -> Short.valueOf(fileStatus.getPath().getName()))
.max(Short::compareTo);
short
epoch =
maxEpoch.orElseThrow(() -> new RuntimeException("Missing sub directory epoch from directory ["
+ path.toString()
+ "]"));
Path openTxFileName = new Path(path, String.valueOf(epoch));
long internalId;
try (FSDataInputStream inStream = fs.open(openTxFileName)) {
internalId = inStream.readLong();
short fileEpoch = inStream.readShort();
if (epoch != fileEpoch) {
throw new RuntimeException(String.format("Was expecting [%s] but got [%s] from path [%s]",
epoch,
fileEpoch,
path.toString()));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
builder.put(txId, Pair.of(internalId, epoch));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return builder.build();
}
}