blob: 7b6a82e44b49e73e5e3307151cad712b02fe1e1c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.eventhub.producer;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.metrics.SamzaHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SystemProducer Helper that can be used to implement SystemProducer that provide async send APIs
* e.g. Kinesis, Event Hubs, etc..
* To implement an AsyncSystemProducer, you need to implement {@link AsyncSystemProducer#sendAsync}
*/
public abstract class AsyncSystemProducer implements SystemProducer {
private static final Logger LOG = LoggerFactory.getLogger(AsyncSystemProducer.class.getName());
private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
/**
* The constant CONFIG_STREAM_LIST. This config is used to get the list of streams produced by this EventHub system.
*/
public static final String CONFIG_STREAM_LIST = EventHubConfig.CONFIG_STREAM_LIST;
private static final String SEND_ERRORS = "sendErrors";
private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
private static final String SEND_LATENCY = "sendLatency";
/**
* The constant AGGREGATE.
*/
protected static final String AGGREGATE = "aggregate";
/**
* The Stream ids.
*/
protected final List<String> streamIds;
/**
* The Map between Physical streamName to virtual Samza streamids.
*/
protected final Map<String, String> physicalToStreamIds;
/**
* The Samza metrics registry used to store all the metrics emitted.
*/
protected final MetricsRegistry metricsRegistry;
/**
* The Pending futures corresponding to the send calls that are still in flight and for which we haven't
* received the call backs yet.
* This needs to be concurrent because it is accessed across the send/flush thread and callback thread.
*/
protected final Set<CompletableFuture<Void>> pendingFutures = ConcurrentHashMap.newKeySet();
private final AtomicReference<Throwable> sendExceptionOnCallback = new AtomicReference<>(null);
private static Counter aggSendErrors = null;
private static SamzaHistogram aggSendLatency = null;
private static SamzaHistogram aggSendCallbackLatency = null;
private final HashMap<String, SamzaHistogram> sendLatency = new HashMap<>();
private final HashMap<String, SamzaHistogram> sendCallbackLatency = new HashMap<>();
private final HashMap<String, Counter> sendErrors = new HashMap<>();
/**
* Instantiates a new Async system producer.
*
* @param systemName the system name
* @param config the config
* @param metricsRegistry the registry
*/
public AsyncSystemProducer(String systemName, Config config, MetricsRegistry metricsRegistry) {
StreamConfig sconfig = new StreamConfig(config);
streamIds = config.getList(String.format(CONFIG_STREAM_LIST, systemName));
physicalToStreamIds =
streamIds.stream().collect(Collectors.toMap(sconfig::getPhysicalName, Function.identity()));
this.metricsRegistry = metricsRegistry;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void send(String source, OutgoingMessageEnvelope envelope) {
checkForSendCallbackErrors("Received exception on message send");
String streamName = envelope.getSystemStream().getStream();
String streamId = physicalToStreamIds.getOrDefault(streamName, streamName);
long beforeSendTimeMs = System.currentTimeMillis();
CompletableFuture<Void> sendResult = sendAsync(source, envelope);
long afterSendTimeMs = System.currentTimeMillis();
long latencyMs = afterSendTimeMs - beforeSendTimeMs;
sendLatency.get(streamId).update(latencyMs);
aggSendLatency.update(latencyMs);
pendingFutures.add(sendResult);
// Auto update the metrics and possible throwable when futures are complete.
sendResult.handle((aVoid, throwable) -> {
long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
sendCallbackLatency.get(streamId).update(callbackLatencyMs);
aggSendCallbackLatency.update(callbackLatencyMs);
if (throwable != null) {
sendErrors.get(streamId).inc();
aggSendErrors.inc();
LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
sendExceptionOnCallback.compareAndSet(null, throwable);
}
return aVoid;
});
}
public void start() {
streamIds.forEach(streamId -> {
sendCallbackLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_CALLBACK_LATENCY));
sendLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_LATENCY));
sendErrors.put(streamId, metricsRegistry.newCounter(streamId, SEND_ERRORS));
});
if (aggSendLatency == null) {
aggSendLatency = new SamzaHistogram(metricsRegistry, AGGREGATE, SEND_LATENCY);
aggSendCallbackLatency = new SamzaHistogram(metricsRegistry, AGGREGATE, SEND_CALLBACK_LATENCY);
aggSendErrors = metricsRegistry.newCounter(AGGREGATE, SEND_ERRORS);
}
}
/**
* Default implementation of the flush that just waits for all the pendingFutures to be complete.
* SystemProducer should override this, if the underlying system provides flush semantics.
* @param source String representing the source of the message.
*/
@Override
public synchronized void flush(String source) {
long incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
LOG.info("Trying to flush pending {} sends.", incompleteSends);
checkForSendCallbackErrors("Received exception on message send.");
CompletableFuture<Void> future =
CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
try {
// Block until all the pending sends are complete or timeout.
future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
String msg = String.format("Flush failed with error. Total pending sends %d", incompleteSends);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
pendingFutures.clear();
checkForSendCallbackErrors("Sending one or more of the messages failed during flush.");
}
/**
* Sends a specified message envelope from a specified Samza source.
* @param source String representing the source of the message.
* @param envelope Aggregate object representing the serialized message to send from the source.
* @return the completable future for the async send call
*/
public abstract CompletableFuture<Void> sendAsync(String source, OutgoingMessageEnvelope envelope);
/**
* This method is used to check whether there were any previous exceptions in the send call backs.
* And if any exception occurs the producer is considered to be stuck/broken
* @param msg the msg that is passed to the exception.
*/
protected void checkForSendCallbackErrors(String msg) {
// Check for send errors
Throwable sendThrowable = sendExceptionOnCallback.get();
if (sendThrowable != null) {
LOG.error(msg, sendThrowable);
throw new SamzaException(msg, sendThrowable);
}
}
}