blob: 3912bcfcaeb54c854a9c2d798d2aa56a8df2fbf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.eventhub.producer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.eventhub.EventHubClientManager;
import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventhubs.impl.ClientConstants;
import com.microsoft.azure.eventhubs.impl.EventDataImpl;
/**
* EventHub system producer that can be used in Samza jobs to send events to Azure EventHubs
*/
public class EventHubSystemProducer extends AsyncSystemProducer {
private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemProducer.class.getName());
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
private static final long DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
public static final String PRODUCE_TIMESTAMP = "produce-timestamp";
public static final String KEY = "key";
// Metrics recording
private static final String EVENT_SKIP_RATE = "eventSkipRate";
private static final String EVENT_WRITE_RATE = "eventWriteRate";
private static final String EVENT_BYTE_WRITE_RATE = "eventByteWriteRate";
private static final Object AGGREGATE_METRICS_LOCK = new Object();
public enum PartitioningMethod {
ROUND_ROBIN, EVENT_HUB_HASHING, PARTITION_KEY_AS_PARTITION
}
/**
* Per stream metrics. Key is the stream Name.
*/
private final HashMap<String, Counter> eventSkipRate = new HashMap<>();
private final HashMap<String, Counter> eventWriteRate = new HashMap<>();
private final HashMap<String, Counter> eventByteWriteRate = new HashMap<>();
/**
* Aggregated metrics.
*/
private static Counter aggEventSkipRate = null;
private static Counter aggEventWriteRate = null;
private static Counter aggEventByteWriteRate = null;
private final EventHubConfig config;
private final PartitioningMethod partitioningMethod;
private final String systemName;
private final int maxMessageSize;
private volatile boolean isStarted = false;
// We implement lazy initialization for producer as a workaround for
// slow shutdown issue.
private boolean isInitialized = false;
/**
* Per partition event hub client. Partitions from the same stream may share the same client,
* depends on config PerPartitionConnection. See {@link EventHubConfig}
*/
@VisibleForTesting
final Map<String, Map<Integer, EventHubClientManager>> perPartitionEventHubClients = new HashMap<>();
private final Map<String, EventHubClientManager> perStreamEventHubClientManagers = new HashMap<>();
/**
* PartitionSender for each partition in the stream.
*/
private final Map<String, Map<Integer, PartitionSender>> streamPartitionSenders = new HashMap<>();
/**
* Per stream message interceptors
*/
private final Map<String, Interceptor> interceptors;
private final EventHubClientManagerFactory eventHubClientManagerFactory;
public EventHubSystemProducer(EventHubConfig config, String systemName,
EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
MetricsRegistry registry) {
super(systemName, config, registry);
LOG.info("Creating EventHub Producer for system {}", systemName);
this.config = config;
this.systemName = systemName;
this.partitioningMethod = config.getPartitioningMethod(systemName);
this.interceptors = interceptors;
this.maxMessageSize = config.getSkipMessagesLargerThan(systemName);
this.eventHubClientManagerFactory = eventHubClientManagerFactory;
}
private void init() {
LOG.info("Initializing EventHubSystemProducer");
// Fetches the stream ids
List<String> streamIds = config.getStreams(systemName);
// Create and initiate connections to Event Hubs
// even if PerPartitionConnection == true, we still need a stream level event hub for initial metadata (fetching
// partition count)
for (String streamId : streamIds) {
EventHubClientManager ehClient =
eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
perStreamEventHubClientManagers.put(streamId, ehClient);
ehClient.init();
}
// Create partition senders if required
if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
// Create all partition senders
perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
try {
Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
Integer numPartitions =
ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
for (int i = 0; i < numPartitions; i++) {
String partitionId = String.valueOf(i);
EventHubClientManager perPartitionClientManager =
createOrGetEventHubClientManagerForPartition(streamId, i);
PartitionSender partitionSender =
perPartitionClientManager.getEventHubClient().createPartitionSender(partitionId).get(DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
partitionSenders.put(i, partitionSender);
}
streamPartitionSenders.put(streamId, partitionSenders);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
throw new SamzaException(msg, e);
} catch (EventHubException | IllegalArgumentException e) {
String msg = "Creation of partition sender failed with exception";
throw new SamzaException(msg, e);
}
});
}
isInitialized = true;
LOG.info("EventHubSystemProducer initialized.");
}
@Override
public synchronized void register(String source) {
LOG.info("Registering source {}", source);
if (isStarted) {
String msg = "Cannot register once the producer is started.";
throw new SamzaException(msg);
}
}
private EventHubClientManager createOrGetEventHubClientManagerForPartition(String streamId, int partitionId) {
Map<Integer, EventHubClientManager> perStreamMap =
perPartitionEventHubClients.computeIfAbsent(streamId, key -> new HashMap<>());
EventHubClientManager eventHubClientManager;
if (config.getPerPartitionConnection(systemName)) {
// will create one EventHub client per partition
if (perStreamMap.containsKey(partitionId)) {
LOG.warn(String.format("Trying to create new EventHubClientManager for partition=%d. But one already exists",
partitionId));
eventHubClientManager = perStreamMap.get(partitionId);
} else {
LOG.info(
String.format("Creating EventHub client manager for streamId=%s, partitionId=%d: ", streamId, partitionId));
eventHubClientManager = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
eventHubClientManager.init();
perStreamMap.put(partitionId, eventHubClientManager);
}
} else {
// will share one EventHub client per stream
eventHubClientManager = perStreamEventHubClientManagers.get(streamId);
perStreamMap.put(partitionId, eventHubClientManager);
}
Validate.notNull(eventHubClientManager,
String.format("Fail to create or get EventHubClientManager for streamId=%s, partitionId=%d", streamId,
partitionId));
return eventHubClientManager;
}
@Override
public synchronized void start() {
super.start();
LOG.info("Starting system producer.");
// Initiate metrics
streamIds.forEach((streamId) -> {
eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
eventWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_WRITE_RATE));
eventByteWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_BYTE_WRITE_RATE));
});
// Locking to ensure that these aggregated metrics will be created only once across multiple system producers.
synchronized (AGGREGATE_METRICS_LOCK) {
if (aggEventWriteRate == null) {
aggEventSkipRate = metricsRegistry.newCounter(AGGREGATE, EVENT_SKIP_RATE);
aggEventWriteRate = metricsRegistry.newCounter(AGGREGATE, EVENT_WRITE_RATE);
aggEventByteWriteRate = metricsRegistry.newCounter(AGGREGATE, EVENT_BYTE_WRITE_RATE);
}
}
isStarted = true;
}
@Override
public synchronized void flush(String source) {
super.flush(source);
}
@Override
public synchronized CompletableFuture<Void> sendAsync(String source, OutgoingMessageEnvelope envelope) {
LOG.debug(String.format("Trying to send %s", envelope));
if (!isStarted) {
throw new SamzaException("Trying to call send before the producer is started.");
}
if (!isInitialized) {
// lazy initialization on the first send
init();
}
String streamId = config.getStreamId(envelope.getSystemStream().getStream());
if (!perStreamEventHubClientManagers.containsKey(streamId)) {
String msg = String.format("Trying to send event to a destination {%s} that is not registered.", streamId);
throw new SamzaException(msg);
}
EventData eventData = createEventData(streamId, envelope);
// SAMZA-1654: waiting for the client library to expose the API to calculate the exact size of the AMQP message
// https://github.com/Azure/azure-event-hubs-java/issues/305
int eventDataLength = eventData.getBytes() == null ? 0 : eventData.getBytes().length;
// If the maxMessageSize is lesser than zero, then it means there is no message size restriction.
if (this.maxMessageSize > 0 && eventDataLength > this.maxMessageSize) {
LOG.info("Received a message with size {} > maxMessageSize configured {(}), Skipping it", eventDataLength,
this.maxMessageSize);
eventSkipRate.get(streamId).inc();
aggEventSkipRate.inc();
return CompletableFuture.completedFuture(null);
}
eventWriteRate.get(streamId).inc();
aggEventWriteRate.inc();
eventByteWriteRate.get(streamId).inc(eventDataLength);
aggEventByteWriteRate.inc(eventDataLength);
EventHubClientManager ehClient = perStreamEventHubClientManagers.get(streamId);
// Async send call
return sendToEventHub(streamId, eventData, getEnvelopePartitionId(envelope), ehClient.getEventHubClient());
}
private CompletableFuture<Void> sendToEventHub(String streamId, EventData eventData, Object partitionKey,
EventHubClient eventHubClient) {
if (PartitioningMethod.ROUND_ROBIN.equals(partitioningMethod)) {
return eventHubClient.send(eventData);
} else if (PartitioningMethod.EVENT_HUB_HASHING.equals(partitioningMethod)) {
if (partitionKey == null) {
throw new SamzaException("Partition key cannot be null for EventHub hashing");
}
return eventHubClient.send(eventData, convertPartitionKeyToString(partitionKey));
} else if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
if (!(partitionKey instanceof Integer)) {
String msg = "Partition key should be of type Integer";
throw new SamzaException(msg);
}
Integer numPartition = streamPartitionSenders.get(streamId).size();
Integer destinationPartition = (Integer) partitionKey % numPartition;
PartitionSender sender = streamPartitionSenders.get(streamId).get(destinationPartition);
return sender.send(eventData);
} else {
throw new SamzaException("Unknown partitioning method " + partitioningMethod);
}
}
protected Object getEnvelopePartitionId(OutgoingMessageEnvelope envelope) {
return envelope.getPartitionKey() == null ? envelope.getKey() : envelope.getPartitionKey();
}
private String convertPartitionKeyToString(Object partitionKey) {
String partitionKeyStr;
if (partitionKey instanceof String) {
partitionKeyStr = (String) partitionKey;
} else if (partitionKey instanceof Integer) {
partitionKeyStr = String.valueOf(partitionKey);
} else if (partitionKey instanceof byte[]) {
partitionKeyStr = new String((byte[]) partitionKey, Charset.defaultCharset());
} else {
throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString());
}
if (partitionKeyStr != null && partitionKeyStr.length() > ClientConstants.MAX_PARTITION_KEY_LENGTH) {
LOG.debug("Length of partition key: {} exceeds limit: {}. Truncating.", partitionKeyStr.length(),
ClientConstants.MAX_PARTITION_KEY_LENGTH);
partitionKeyStr = partitionKeyStr.substring(0, ClientConstants.MAX_PARTITION_KEY_LENGTH);
}
return partitionKeyStr;
}
protected EventData createEventData(String streamId, OutgoingMessageEnvelope envelope) {
Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamId, null));
byte[] eventValue = (byte[]) envelope.getMessage();
if (interceptor.isPresent()) {
eventValue = interceptor.get().intercept(eventValue);
}
EventData eventData = new EventDataImpl(eventValue);
eventData.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis()));
if (config.getSendKeyInEventProperties(systemName)) {
String keyValue = "";
if (envelope.getKey() != null) {
keyValue = (envelope.getKey() instanceof byte[]) ? new String((byte[]) envelope.getKey())
: envelope.getKey().toString();
}
eventData.getProperties().put(KEY, keyValue);
}
return eventData;
}
@Override
public synchronized void stop() {
LOG.info("Stopping producer.");
streamPartitionSenders.values().forEach((streamPartitionSender) -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
LOG.error("Closing the partition sender failed ", e);
}
});
perStreamEventHubClientManagers.values()
.parallelStream()
.forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
perStreamEventHubClientManagers.clear();
if (config.getPerPartitionConnection(systemName)) {
perPartitionEventHubClients.values()
.stream()
.flatMap(map -> map.values().stream())
.forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
perPartitionEventHubClients.clear();
}
isStarted = false;
isInitialized = false;
LOG.info("EventHubSystemProducer stopped.");
}
/**
* Gets pending futures. Used by the test.
*
* @return the pending futures
*/
Collection<CompletableFuture<Void>> getPendingFutures() {
return pendingFutures;
}
}