blob: d89f38f32d6c1f774447d7a9a13b0308f957bc72 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.system.azureblob.producer;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.CompressionFactory;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemProducerException;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* AzureBlob system producer to send messages to Azure Blob Storage.
* This system producer is thread safe.
* For different sources: sends/flushes can happen in parallel.
* For same source: It supports sends in parallel. flushes are exclusive.
* Azure Blob Storage has a 3 level hierarchy: an Azure account contains multiple containers (akin to directories
* in a file system) and each container has multiple blobs (akin to files).
* Azure Container: System name maps to the name of Azure container.
* An instance of a system producer writes to a single Azure container considering the container as a system.
* Azure Blob: For a given stream-partition pair, a blob is created with name stream/partition/timestamp-randomString.
* The stream and partition are extracted from the SSP of OutgoingMessageEnvelope in send().
* Blob is started when the first message for that stream-partition is sent by a source
* and closed during flush for that source.
* Subsequent sends by the source to the same stream-partition will create a new blob with a different timestamp.
* Thus, timestamp corresponds to writer creation time i.e; the first send for source-SSP
* or first send after a flush for the source.
* If max blob size or record limit are configured, then a new blob is started when limits exceed.
* A random string is used as a suffix in the blob name to prevent collisions:
* - if two system producers are writing to the same SSP.
* - if two sources send to the same SSP.
* Lifecycle of the system producer is shown below. All sources have to be registered before starting the producer.
* Several messages can be sent by a source via send(source, envelope). This can be followed by a flush(source) or stop()
* After flush(source), more messages can be sent for that source and other sources as well. stop() internally calls
* flush(source) for all the sources registered. After stop(), no calls to send and flush are allowed.
* ┌──────────────────────────────┐
* │ │
* ▼ │
* Lifecycle: register(source) ────────▶ start() ──────▶ send(source, envelope) ──────▶ flush(source) ──────▶ stop()
* [multiple times │ ▲ │ ▲
* for └────┘ └─────────────────────────────────────┘
* multiple sources]
* This SystemProducer does not open up the envelopes sent through it. It is the responsibility of the user of this
* SystemProducer to ensure the envelopes are valid and a correct writer has been chosen by wiring up the
* writer factory config.
public class AzureBlobSystemProducer implements SystemProducer {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobSystemProducer.class.getName());
private static final String BLOB_NAME_PREFIX = "%s";
private static final String BLOB_NAME_PARTITION_PREFIX = "%s/%s";
private static final String AZURE_URL = "";
private static final int PREMIUM_MAX_BLOCK_SIZE = 100 * 1024 * 1024; // 100MB
private static final int STANDARD_MAX_BLOCK_SIZE = 4 * 1024 * 1024; // 4MB
private BlobContainerAsyncClient containerAsyncClient;
private final String systemName;
private final AzureBlobConfig config;
// Map of writers indexed first by sourceName and then by (streamName, partitionName) or just streamName if partition key does not exist.
private final Map<String, Map<String, AzureBlobWriter>> writerMap;
private final AzureBlobWriterFactory writerFactory;
private final int blockFlushThresholdSize;
private final long flushTimeoutMs;
private final long closeTimeout;
private final ThreadPoolExecutor asyncBlobThreadPool;
private volatile boolean isStarted = false;
private volatile boolean isStopped = false;
private final AzureBlobSystemProducerMetrics metrics;
private final Map<String, Object> sourceWriterCreationLockMap = new ConcurrentHashMap<>();
private final Map<String, ReadWriteLock> sourceSendFlushLockMap = new ConcurrentHashMap<>();
private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
private final Config blobMetadataGeneratorConfig;
public AzureBlobSystemProducer(String systemName, AzureBlobConfig config, MetricsRegistry metricsRegistry) {
Preconditions.checkNotNull(systemName, "System name can not be null when creating AzureBlobSystemProducer");
Preconditions.checkNotNull(config, "Config can not be null when creating AzureBlobSystemProducer");
Preconditions.checkNotNull(metricsRegistry, "Metrics registry can not be null when creating AzureBlobSystemProducer");
// Azure logs do not show without this property set
System.setProperty(Configuration.PROPERTY_AZURE_LOG_LEVEL, "1");
this.systemName = systemName;
this.config = config;
String writerFactoryClassName = this.config.getAzureBlobWriterFactoryClassName(this.systemName);
try {
this.writerFactory = (AzureBlobWriterFactory) Class.forName(writerFactoryClassName).newInstance();
} catch (Exception e) {
throw new SystemProducerException("Could not create writer factory with name " + writerFactoryClassName, e);
this.flushTimeoutMs = this.config.getFlushTimeoutMs(this.systemName);
this.closeTimeout = this.config.getCloseTimeoutMs(this.systemName);
this.blockFlushThresholdSize = this.config.getMaxFlushThresholdSize(this.systemName);
int asyncBlobThreadPoolCount = this.config.getAzureBlobThreadPoolCount(this.systemName);
int blockingQueueSize = this.config.getBlockingQueueSize(this.systemName);"SystemName: {} block flush size:{}", systemName, this.blockFlushThresholdSize);"SystemName: {} thread count:{}", systemName, asyncBlobThreadPoolCount);
linkedBlockingDeque = new LinkedBlockingDeque<>(blockingQueueSize);
this.asyncBlobThreadPool =
new ThreadPoolExecutor(asyncBlobThreadPoolCount, asyncBlobThreadPoolCount, 60,
TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
this.writerMap = new ConcurrentHashMap<>();
this.metrics = new AzureBlobSystemProducerMetrics(systemName, config.getAzureAccountName(systemName), metricsRegistry);
String blobMetadataGeneratorFactoryClassName = this.config.getSystemBlobMetadataPropertiesGeneratorFactory(this.systemName);
try {
blobMetadataGeneratorFactory = (BlobMetadataGeneratorFactory) Class.forName(blobMetadataGeneratorFactoryClassName).newInstance();
} catch (Exception e) {
throw new SystemProducerException("Could not create blob metadata generator factory with name " + blobMetadataGeneratorFactoryClassName, e);
blobMetadataGeneratorConfig = this.config.getSystemBlobMetadataGeneratorConfigs(systemName);
* {@inheritDoc}
* @throws SystemProducerException
public synchronized void start() {
if (isStarted) {
LOG.warn("Attempting to start an already started producer.");
String accountName = config.getAzureAccountName(systemName);
String accountKey = config.getAzureAccountKey(systemName);
setupAzureContainer(accountName, accountKey);"Starting producer.");
isStarted = true;
* {@inheritDoc}
* @throws SystemProducerException
public synchronized void stop() {
if (!isStarted) {
LOG.warn("Attempting to stop a producer that was not started.");
if (isStopped) {
LOG.warn("Attempting to stop an already stopped producer.");
try {
writerMap.forEach((source, sourceWriterMap) -> flush(source));
isStarted = false;
} catch (Exception e) {
throw new SystemProducerException("Stop failed with exception.", e);
} finally {
isStopped = true;
* {@inheritDoc}
* @throws SystemProducerException
public synchronized void register(String source) {"Registering source {}", source);
if (isStarted) {
throw new SystemProducerException("Cannot register once the producer is started.");
if (writerMap.containsKey(source)) {
// source already registered => writerMap and metrics have entries for the source
LOG.warn("Source: {} already registered", source);
writerMap.put(source, new ConcurrentHashMap<>());
sourceWriterCreationLockMap.put(source, new Object());
sourceSendFlushLockMap.put(source, new ReentrantReadWriteLock());
* Multi-threading and thread-safety:
* From Samza usage of SystemProducer:
* The lifecycle of SystemProducer shown above is consistent with most use cases within Samza (with the exception of
* Coordinator stream store/producer and KafkaCheckpointManager).
* A single parent thread creates the SystemProducer, registers all sources and starts it before handing it
* to multiple threads for use (send and flush). Finally, the single parent thread stops the producer.
* The most frequent operations on a SystemProducer are send and flush while register, start and stop are one-time operations.
* Based on this usage pattern: to provide multi-threaded support and improve throughput of this SystemProducer,
* multiple sends and flushes need to happen in parallel. However, the following rules are needed to ensure
* o data loss and data consistency.
* 1. sends can happen in parallel for same source or different sources.
* 2. send and flush for the same source can not happen in parallel. Although, the AzureBlobWriter is thread safe,
* interleaving write and flush and close operations of a writer can lead to data loss if a write happens between flush and close.
* There are other scenarios such as issuing a write to the writer after close and so on.
* 3. writer creation for the same writer key (SSP) can not happen in parallel - for the reason that multiple
* writers could get created with only one being retained but all being used and GCed after a send, leading to data loss.
* These 3 rules are achieved by using a per source ReadWriteLock to allow sends in parallel but guarantee exclusivity for flush.
* Additionally, a per source lock is used to ensure writer creation is in a critical section.
* Concurrent access to shared objects as follows:
* 1. AzureBlobWriters is permitted as long as there are no interleaving of operations for a writer.
* If multiple operations of writer (as in flush) then make it synchronized.
* 2. ConcurrentHashMaps (esp writerMap per source) get and put - disallow interleaving by doing put and clear under locks.
* 3. WriterFactory and Metrics are thread-safe. WriterFactory is stateless while Metrics' operations interleaving
* is thread-safe too as they work on different counters.
* The above locking mechanisms ensure thread-safety.
* {@inheritDoc}
* @throws SystemProducerException
public void send(String source, OutgoingMessageEnvelope messageEnvelope) {
if (!isStarted) {
throw new SystemProducerException("Trying to send before producer has started.");
if (isStopped) {
throw new SystemProducerException("Sending after producer has been stopped.");
ReadWriteLock lock = sourceSendFlushLockMap.get(source);
if (lock == null) {
throw new SystemProducerException("Attempting to send to source: " + source + " but it was not registered");
try {
AzureBlobWriter writer = getOrCreateWriter(source, messageEnvelope);
} catch (Exception e) {
Object partitionKey = getPartitionKey(messageEnvelope);
String msg = "Send failed for source: " + source + ", system: " + systemName
+ ", stream: " + messageEnvelope.getSystemStream().getStream()
+ ", partitionKey: " + ((partitionKey != null) ? partitionKey : "null");
throw new SystemProducerException(msg, e);
} finally {
* {@inheritDoc}
* @throws SystemProducerException
public void flush(String source) {
if (!isStarted) {
throw new SystemProducerException("Trying to flush before producer has started.");
if (isStopped) {
throw new SystemProducerException("Flushing after producer has been stopped.");
ReadWriteLock lock = sourceSendFlushLockMap.get(source);
if (lock == null) {
throw new SystemProducerException("Attempting to flush source: " + source + " but it was not registered");
Map<String, AzureBlobWriter> sourceWriterMap = writerMap.get(source);
try {
// first flush all the writers
// then close and remove all the writers
closeWriters(source, sourceWriterMap);
} catch (Exception e) {
throw new SystemProducerException("Flush failed for system:" + systemName + " and source: " + source, e);
} finally {
void setupAzureContainer(String accountName, String accountKey) {
try {
// Use your Azure Blob Storage account's name and key to create a credential object to access your account.
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
HttpClient httpClient;
if (config.getUseProxy(systemName)) {"HTTP Proxy setup for AzureBlob pipeline");
httpClient = new NettyAsyncHttpClientBuilder()
.proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
new InetSocketAddress(config.getAzureProxyHostname(systemName), config.getAzureProxyPort(systemName)))).build();
} else {
httpClient = HttpClient.createDefault();
// From the Azure portal, get your Storage account blob service AsyncClient endpoint.
String endpoint = String.format(Locale.ROOT, AZURE_URL, accountName);
HttpLogOptions httpLogOptions = new HttpLogOptions();
BlobServiceAsyncClient storageClient =
new BlobServiceClientBuilder()
SkuName accountType = storageClient.getAccountInfo().block().getSkuName();
long flushThresholdSize = config.getMaxFlushThresholdSize(systemName);
boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { // 100 MB
throw new SystemProducerException("Azure storage account with name: " + accountName
+ " is a premium account and can only handle upto " + PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
+ flushThresholdSize);
} else if (!isPremiumAccount && flushThresholdSize > STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
throw new SystemProducerException("Azure storage account with name: " + accountName
+ " is a standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
+ flushThresholdSize);
containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName);
// Only way to check if container exists or not is by creating it and look for failure/success.
} catch (Exception e) {
throw new SystemProducerException("Failed to set up Azure container for SystemName: " + systemName, e);
* // find the writer in the writerMap else create one
* @param source for which to find/create the writer
* @param messageEnvelope to fetch the schema from if writer needs to be created
* @return an AzureBlobWriter object
AzureBlobWriter getOrCreateWriter(String source, OutgoingMessageEnvelope messageEnvelope) {
String writerMapKey;
String blobURLPrefix;
String partitionKey = getPartitionKey(messageEnvelope);
// using most significant bits in UUID (8 digits) to avoid collision in blob names
if (partitionKey == null) {
writerMapKey = messageEnvelope.getSystemStream().getStream();
blobURLPrefix = String.format(BLOB_NAME_PREFIX, messageEnvelope.getSystemStream().getStream());
} else {
writerMapKey = messageEnvelope.getSystemStream().getStream() + "/" + partitionKey;
blobURLPrefix = String.format(BLOB_NAME_PARTITION_PREFIX, messageEnvelope.getSystemStream().getStream(), partitionKey);
Map<String, AzureBlobWriter> sourceWriterMap = writerMap.get(source);
if (sourceWriterMap == null) {
throw new SystemProducerException("Attempting to send to source: " + source + " but it is not registered");
AzureBlobWriter writer = sourceWriterMap.get(writerMapKey);
if (writer == null) {
synchronized (sourceWriterCreationLockMap.get(source)) {
writer = sourceWriterMap.get(writerMapKey);
if (writer == null) {
AzureBlobWriterMetrics writerMetrics =
new AzureBlobWriterMetrics(metrics.getAggregateMetrics(), metrics.getSystemMetrics(), metrics.getSourceMetrics(source));
writer = createNewWriter(blobURLPrefix, writerMetrics, messageEnvelope.getSystemStream().getStream());
sourceWriterMap.put(writerMapKey, writer);
return writer;
private void createContainerIfNotExists(BlobContainerAsyncClient containerClient) {
try {
} catch (BlobStorageException e) {
//StorageErrorCode defines constants corresponding to all error codes returned by the service.
if (e.getErrorCode() == BlobErrorCode.RESOURCE_NOT_FOUND) {
HttpResponse response = e.getResponse();
LOG.error("Error creating the container url " + containerClient.getBlobContainerUrl().toString() + " with status code: " + response.getStatusCode(), e);
} else if (e.getErrorCode() == BlobErrorCode.CONTAINER_BEING_DELETED) {
LOG.error("Container is being deleted. Container URL is: " + containerClient.getBlobContainerUrl().toString(), e);
} else if (e.getErrorCode() == BlobErrorCode.CONTAINER_ALREADY_EXISTS) {
throw e;
private String getPartitionKey(OutgoingMessageEnvelope messageEnvelope) {
Object partitionKey = messageEnvelope.getPartitionKey();
if (partitionKey == null || !(partitionKey instanceof String)) {
return null;
return (String) partitionKey;
private void flushWriters(Map<String, AzureBlobWriter> sourceWriterMap) {
sourceWriterMap.forEach((stream, writer) -> {
try {"Flushing topic:{}", stream);
} catch (IOException e) {
throw new SystemProducerException("Close failed for topic " + stream, e);
private void closeWriters(String source, Map<String, AzureBlobWriter> sourceWriterMap) throws Exception {
Set<CompletableFuture<Void>> pendingClose = ConcurrentHashMap.newKeySet();
try {
sourceWriterMap.forEach((stream, writer) -> {"Closing topic:{}", stream);
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
public void run() {
try {
} catch (Exception e) {
throw new SystemProducerException("Close failed for topic " + stream, e);
}, asyncBlobThreadPool);
future.handle((aVoid, throwable) -> {
if (throwable != null) {
throw new SystemProducerException("Close failed for topic " + stream, throwable);
} else {"Blob close finished for stream " + stream);
return aVoid;
CompletableFuture<Void> future = CompletableFuture.allOf(pendingClose.toArray(new CompletableFuture[0]));"Flush source: {} has pending closes: {} ", source, pendingClose.size());
future.get((long) closeTimeout, TimeUnit.MILLISECONDS);
} finally {
AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMetrics, String streamName) {
try {
return writerFactory.getWriterInstance(containerAsyncClient, blobURL, asyncBlobThreadPool, writerMetrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
blockFlushThresholdSize, flushTimeoutMs,
} catch (Exception e) {
throw new RuntimeException("Failed to create a writer for the producer.", e);