blob: 85c2b3325b5ae8bb6fad097a0323eacfb5b8165f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.azureblob.avro;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements {@link org.apache.samza.system.azureblob.producer.AzureBlobWriter}
* for writing avro records to Azure Blob Storage.
*
* It uses {@link org.apache.avro.file.DataFileWriter} to convert avro records it receives to byte[].
* This byte[] is passed on to {@link org.apache.samza.system.azureblob.avro.AzureBlobOutputStream}.
* AzureBlobOutputStream in turn uploads data to Storage as a blob.
*
* It also accepts encoded records as byte[] as long as the first OutgoingMessageEnvelope this writer receives
* is a decoded record from which to get the schema and record type (GenericRecord vs SpecificRecord).
* The subsequent encoded records are written directly to AzureBlobOutputStream without checking if they conform
* to the schema. It is the responsibility of the user to ensure this. Failing to do so may result in an
* unreadable avro blob.
*
* It expects all OutgoingMessageEnvelopes to be of the same schema.
* To handle schema evolution (sending envelopes of different schema), this writer has to be closed and a new writer
* has to be created. The first envelope of the new writer should contain a valid record to get schema from.
* If used by AzureBlobSystemProducer, this is done through systemProducer.flush(source).
*
* Once closed this object can not be used.
* This is a thread safe class.
*
* If the number of records or size of the current blob exceeds the specified limits then a new blob is created.
*/
public class AzureBlobAvroWriter implements AzureBlobWriter {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobAvroWriter.class);
private static final String PUBLISHED_FILE_NAME_DATE_FORMAT = "yyyy/MM/dd/HH/mm-ss";
private static final String BLOB_NAME_AVRO = "%s/%s.avro%s";
private static final String BLOB_NAME_RANDOM_STRING_AVRO = "%s/%s-%s.avro%s";
private static final SimpleDateFormat UTC_FORMATTER = buildUTCFormatter();
// Avro's DataFileWriter has internal buffers and also adds metadata.
// Based on the current default sizes of these buffers and metadata, the data overhead is a little less than 100KB
// However, taking the overhead to be capped at 1MB to ensure enough room if the default values are increased.
static final long DATAFILEWRITER_OVERHEAD = 1000000; // 1MB
// currentBlobWriterComponents == null only for the first blob immediately after this AzureBlobAvroWriter object has been created.
// rest of this object's lifecycle, currentBlobWriterComponents is not null.
private BlobWriterComponents currentBlobWriterComponents = null;
private final List<BlobWriterComponents> allBlobWriterComponents = new ArrayList<>();
private Schema schema = null;
// datumWriter == null only for the first blob immediately after this AzureBlobAvroWriter object has been created.
// It is created from the schema taken from the first OutgoingMessageEnvelope. Hence the first OME has to be a decoded avro record.
// For rest of this object's lifecycle, datumWriter is not null.
private DatumWriter<IndexedRecord> datumWriter = null;
private volatile boolean isClosed = false;
private final Executor blobThreadPool;
private final AzureBlobWriterMetrics metrics;
private final int maxBlockFlushThresholdSize;
private final long flushTimeoutMs;
private final Compression compression;
private final BlobContainerAsyncClient containerAsyncClient;
private final String blobURLPrefix;
private final long maxBlobSize;
private final long maxRecordsPerBlob;
private final boolean useRandomStringInBlobName;
private final Object currentDataFileWriterLock = new Object();
private volatile long recordsInCurrentBlob = 0;
private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
private Config blobMetadataGeneratorConfig;
private String streamName;
public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob) {
this.blobThreadPool = blobThreadPool;
this.metrics = metrics;
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
this.flushTimeoutMs = flushTimeoutMs;
this.compression = compression;
this.containerAsyncClient = containerAsyncClient;
this.blobURLPrefix = blobURLPrefix;
this.useRandomStringInBlobName = useRandomStringInBlobName;
this.maxBlobSize = maxBlobSize;
this.maxRecordsPerBlob = maxRecordsPerBlob;
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
}
/**
* This method expects the {@link org.apache.samza.system.OutgoingMessageEnvelope}
* to contain a message which is a {@link org.apache.avro.generic.IndexedRecord} or an encoded record aka byte[].
* If the record is already encoded, it will directly write the byte[] to the output stream without checking if it conforms to schema.
* Else, it encodes the record and writes to output stream.
* However, the first envelope should always be a record and not a byte[].
* If the blocksize threshold crosses, it will upload the output stream contents as a block.
* If the number of records in current blob or size of current blob exceed limits then a new blob is created.
* Multi-threading and thread-safety:
* The underlying {@link org.apache.avro.file.DataFileWriter} is not thread-safe.
* For this reason, it is essential to wrap accesses to this object in a synchronized block.
* Method write(OutgoingMessageEnvelope) allows multiple threads to encode records as that operation is stateless but
* restricts access to the shared objects through the synchronized block.
* Concurrent access to shared objects is controlled through a common lock and synchronized block and hence ensures
* thread safety.
* @param ome - OutgoingMessageEnvelope that contains the IndexedRecord (GenericRecord or SpecificRecord) or an encoded record as byte[]
* @throws IOException when
* - OutgoingMessageEnvelope's message is not an IndexedRecord or
* - underlying dataFileWriter.append fails
* @throws IllegalStateException when the first OutgoingMessageEnvelope's message is not a record.
*/
@Override
public void write(OutgoingMessageEnvelope ome) throws IOException {
Optional<IndexedRecord> optionalIndexedRecord;
byte[] encodedRecord;
if (ome.getMessage() instanceof IndexedRecord) {
optionalIndexedRecord = Optional.of((IndexedRecord) ome.getMessage());
encodedRecord = encodeRecord((IndexedRecord) ome.getMessage());
} else if (ome.getMessage() instanceof byte[]) {
optionalIndexedRecord = Optional.empty();
encodedRecord = (byte[]) ome.getMessage();
} else {
throw new IllegalArgumentException("AzureBlobAvroWriter only supports IndexedRecord and byte[].");
}
synchronized (currentDataFileWriterLock) {
// if currentBlobWriterComponents is null, then it is the first blob of this AzureBlobAvroWriter object
if (currentBlobWriterComponents == null || willCurrentBlobExceedSize(encodedRecord) || willCurrentBlobExceedRecordLimit()) {
startNextBlob(optionalIndexedRecord);
}
currentBlobWriterComponents.dataFileWriter.appendEncoded(ByteBuffer.wrap(encodedRecord));
recordsInCurrentBlob++;
// incrementNumberOfRecordsInBlob should always be invoked every time appendEncoded above is invoked.
// this is to count the number records in a blob and then use that count as a metadata of the blob.
currentBlobWriterComponents.azureBlobOutputStream.incrementNumberOfRecordsInBlob();
}
}
/**
* This method flushes all records written in dataFileWriter to the underlying AzureBlobOutputStream.
* dataFileWriter.flush then explicitly invokes flush of the AzureBlobOutputStream.
* This in turn async uploads content of the output stream as a block and reinits the output stream.
* AzureBlobOutputStream.flush is not ensured if dataFileWriter.flush fails.
* In such a scenario, the current block is not uploaded and blocks uploaded so far are lost.
* {@inheritDoc}
* @throws IOException if underlying dataFileWriter.flush fails
*/
@Override
public void flush() throws IOException {
synchronized (currentDataFileWriterLock) {
currentBlobWriterComponents.dataFileWriter.flush();
}
}
/**
* This method closes all DataFileWriters and output streams associated with all the blobs created.
* flush should be explicitly called before close.
* {@inheritDoc}
* @throws IllegalStateException when closing a closed writer
* @throws SamzaException if underlying DataFileWriter.close fails
*/
@Override
public void close() {
synchronized (currentDataFileWriterLock) {
if (isClosed) {
throw new IllegalStateException("Attempting to close an already closed AzureBlobAvroWriter");
}
allBlobWriterComponents.forEach(blobWriterComponents -> {
try {
closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream,
blobWriterComponents.blockBlobAsyncClient);
} catch (IOException e) {
throw new SamzaException(e);
}
});
isClosed = true;
}
}
@VisibleForTesting
AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, AzureBlobWriterMetrics metrics,
Executor blobThreadPool, int maxBlockFlushThresholdSize, int flushTimeoutMs, String blobURLPrefix,
DataFileWriter<IndexedRecord> dataFileWriter,
AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
this.currentBlobWriterComponents = null;
} else {
this.currentBlobWriterComponents =
new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient);
}
this.allBlobWriterComponents.add(this.currentBlobWriterComponents);
this.blobThreadPool = blobThreadPool;
this.blobURLPrefix = blobURLPrefix;
this.metrics = metrics;
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
this.flushTimeoutMs = flushTimeoutMs;
this.compression = compression;
this.containerAsyncClient = containerAsyncClient;
this.useRandomStringInBlobName = useRandomStringInBlobName;
this.maxBlobSize = maxBlobSize;
this.maxRecordsPerBlob = maxRecordsPerBlob;
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
}
@VisibleForTesting
byte[] encodeRecord(IndexedRecord record) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Schema schema = record.getSchema();
try {
EncoderFactory encoderfactory = new EncoderFactory();
BinaryEncoder encoder = encoderfactory.binaryEncoder(out, null);
DatumWriter<IndexedRecord> writer;
if (record instanceof SpecificRecord) {
writer = new SpecificDatumWriter<>(schema);
} else {
writer = new GenericDatumWriter<>(schema);
}
writer.write(record, encoder);
encoder.flush(); //encoder may buffer
} catch (Exception e) {
throw new SamzaException("Unable to serialize Avro record using schema within the record: " + schema.toString(), e);
}
return out.toByteArray();
}
private static SimpleDateFormat buildUTCFormatter() {
SimpleDateFormat formatter = new SimpleDateFormat(PUBLISHED_FILE_NAME_DATE_FORMAT);
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
return formatter;
}
private void closeDataFileWriter(DataFileWriter dataFileWriter, AzureBlobOutputStream azureBlobOutputStream,
BlockBlobAsyncClient blockBlobAsyncClient) throws IOException {
try {
LOG.info("Closing the blob: {}", blockBlobAsyncClient.getBlobUrl().toString());
// dataFileWriter.close calls close of the azureBlobOutputStream associated with it.
dataFileWriter.close();
} catch (Exception e) {
LOG.error("Exception occurred during DataFileWriter.close for blob "
+ blockBlobAsyncClient.getBlobUrl()
+ ". All blocks uploaded so far for this blob will be discarded to avoid invalid blobs.");
throw e;
}
}
private void startNextBlob(Optional<IndexedRecord> optionalIndexedRecord) throws IOException {
if (currentBlobWriterComponents != null) {
LOG.info("Starting new blob as current blob size is "
+ currentBlobWriterComponents.azureBlobOutputStream.getSize()
+ " and max blob size is " + maxBlobSize
+ " or number of records is " + recordsInCurrentBlob
+ " and max records in blob is " + maxRecordsPerBlob);
currentBlobWriterComponents.dataFileWriter.flush();
currentBlobWriterComponents.azureBlobOutputStream.releaseBuffer();
recordsInCurrentBlob = 0;
}
// datumWriter is null when AzureBlobAvroWriter is created but has not yet received a message.
// optionalIndexedRecord is the first message in this case.
if (datumWriter == null) {
if (optionalIndexedRecord.isPresent()) {
IndexedRecord record = optionalIndexedRecord.get();
schema = record.getSchema();
if (record instanceof SpecificRecord) {
datumWriter = new SpecificDatumWriter<>(schema);
} else {
datumWriter = new GenericDatumWriter<>(schema);
}
} else {
throw new IllegalStateException("Writing without schema setup.");
}
}
String blobURL;
if (useRandomStringInBlobName) {
blobURL = String.format(BLOB_NAME_RANDOM_STRING_AVRO, blobURLPrefix,
UTC_FORMATTER.format(System.currentTimeMillis()), UUID.randomUUID().toString().substring(0, 8),
compression.getFileExtension());
} else {
blobURL = String.format(BLOB_NAME_AVRO, blobURLPrefix,
UTC_FORMATTER.format(System.currentTimeMillis()), compression.getFileExtension());
}
LOG.info("Creating new blob: {}", blobURL);
BlockBlobAsyncClient blockBlobAsyncClient = containerAsyncClient.getBlobAsyncClient(blobURL).getBlockBlobAsyncClient();
DataFileWriter<IndexedRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
AzureBlobOutputStream azureBlobOutputStream;
try {
azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
} catch (Exception e) {
throw new SamzaException("Unable to create AzureBlobOutputStream", e);
}
dataFileWriter.create(schema, azureBlobOutputStream);
dataFileWriter.setFlushOnEveryBlock(false);
this.currentBlobWriterComponents = new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient);
allBlobWriterComponents.add(this.currentBlobWriterComponents);
LOG.info("Created new blob: {}", blobURL);
}
private boolean willCurrentBlobExceedSize(byte[] encodedRecord) {
AzureBlobOutputStream azureBlobOutputStream = currentBlobWriterComponents.azureBlobOutputStream;
return (azureBlobOutputStream.getSize() + encodedRecord.length + DATAFILEWRITER_OVERHEAD) > maxBlobSize;
}
private boolean willCurrentBlobExceedRecordLimit() {
return (recordsInCurrentBlob + 1) > maxRecordsPerBlob;
}
/**
* Holds the components needed to write to an Azure Blob
* - including Avro's DataFileWriter, AzureBlobOutputStream and Azure's BlockBlobAsyncClient
*/
private class BlobWriterComponents {
final DataFileWriter<IndexedRecord> dataFileWriter;
final AzureBlobOutputStream azureBlobOutputStream;
final BlockBlobAsyncClient blockBlobAsyncClient;
public BlobWriterComponents(DataFileWriter dataFileWriter, AzureBlobOutputStream azureBlobOutputStream,
BlockBlobAsyncClient blockBlobAsyncClient) {
Preconditions.checkNotNull(dataFileWriter, "DataFileWriter can not be null when creating WriterComponents for an Azure Blob.");
Preconditions.checkNotNull(azureBlobOutputStream, "AzureBlobOutputStream can not be null when creating WriterComponents for an Azure Blob.");
Preconditions.checkNotNull(blockBlobAsyncClient, "BlockBlobAsyncClient can not be null when creating WriterComponents for an Azure Blob.");
this.dataFileWriter = dataFileWriter;
this.azureBlobOutputStream = azureBlobOutputStream;
this.blockBlobAsyncClient = blockBlobAsyncClient;
}
}
}