blob: 44f77ac2912262d989fc67852b9bff5469f50a82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.streamer;
import static org.apache.ignite.internal.util.IgniteUtils.copyStateTo;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
/**
* Data streamer subscriber.
*
* @param <T> Item type.
* @param <P> Partition type.
*/
public class StreamerSubscriber<T, P> implements Subscriber<T> {
private final StreamerBatchSender<T, P> batchSender;
private final StreamerPartitionAwarenessProvider<T, P> partitionAwarenessProvider;
private final StreamerOptions options;
private final CompletableFuture<Void> completionFut = new CompletableFuture<>();
private final AtomicInteger pendingItemCount = new AtomicInteger();
private final AtomicInteger inFlightItemCount = new AtomicInteger();
// NOTE: This can accumulate empty buffers for stopped/failed nodes. Cleaning up is not trivial in concurrent scenario.
// We don't expect thousands of node failures, so it should be fine.
private final ConcurrentHashMap<P, StreamerBuffer<T>> buffers = new ConcurrentHashMap<>();
private final ConcurrentMap<P, CompletableFuture<Void>> pendingRequests = new ConcurrentHashMap<>();
private final IgniteLogger log;
private final StreamerMetricSink metrics;
private @Nullable Flow.Subscription subscription;
private @Nullable ScheduledExecutorService flushTimer;
private @Nullable ScheduledFuture<?> flushTask;
/**
* Constructor.
*
* @param batchSender Batch sender.
* @param options Data streamer options.
*/
public StreamerSubscriber(
StreamerBatchSender<T, P> batchSender,
StreamerPartitionAwarenessProvider<T, P> partitionAwarenessProvider,
StreamerOptions options,
IgniteLogger log,
@Nullable StreamerMetricSink metrics) {
assert batchSender != null;
assert partitionAwarenessProvider != null;
assert options != null;
assert log != null;
this.batchSender = batchSender;
this.partitionAwarenessProvider = partitionAwarenessProvider;
this.options = options;
this.log = log;
this.metrics = getMetrics(metrics);
}
/** {@inheritDoc} */
@Override
public void onSubscribe(Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("Subscription is already set.");
}
this.subscription = subscription;
// Refresh schemas and partition assignment, then request initial batch.
partitionAwarenessProvider.refreshAsync()
.whenComplete((res, err) -> {
if (err != null) {
log.error("Failed to refresh schemas and partition assignment: " + err.getMessage(), err);
close(err);
} else {
initFlushTimer();
requestMore();
}
});
}
/** {@inheritDoc} */
@Override
public void onNext(T item) {
pendingItemCount.decrementAndGet();
P partition = partitionAwarenessProvider.partition(item);
StreamerBuffer<T> buf = buffers.computeIfAbsent(
partition,
p -> new StreamerBuffer<>(options.batchSize(), items -> enlistBatch(p, items)));
buf.add(item);
this.metrics.streamerItemsQueuedAdd(1);
requestMore();
}
/** {@inheritDoc} */
@Override
public void onError(Throwable throwable) {
close(throwable);
}
/** {@inheritDoc} */
@Override
public void onComplete() {
close(null);
}
/**
* Returns a future that will be completed once all the data is sent.
*
* @return Completion future.
*/
public CompletableFuture<Void> completionFuture() {
return completionFut;
}
private void enlistBatch(P partition, Collection<T> batch) {
int batchSize = batch.size();
assert batchSize > 0 : "Batch size must be positive.";
assert partition != null : "Partition must not be null.";
inFlightItemCount.addAndGet(batchSize);
metrics.streamerBatchesActiveAdd(1);
pendingRequests.compute(
partition,
// Chain existing futures to preserve request order.
(part, fut) -> fut == null ? sendBatch(part, batch) : fut.thenCompose(v -> sendBatch(part, batch))
);
}
private CompletableFuture<Void> sendBatch(P partition, Collection<T> batch) {
// If a connection fails, the batch goes to default connection thanks to built-it retry mechanism.
try {
return batchSender.sendAsync(partition, batch).whenComplete((res, err) -> {
if (err != null) {
// Retry is handled by the sender (RetryPolicy in ReliableChannel on the client, sendWithRetry on the server).
// If we get here, then retries are exhausted and we should fail the streamer.
log.error("Failed to send batch to partition " + partition + ": " + err.getMessage(), err);
close(err);
} else {
int batchSize = batch.size();
this.metrics.streamerBatchesSentAdd(1);
this.metrics.streamerBatchesActiveAdd(-1);
this.metrics.streamerItemsSentAdd(batchSize);
this.metrics.streamerItemsQueuedAdd(-batchSize);
inFlightItemCount.addAndGet(-batchSize);
requestMore();
// Refresh partition assignment asynchronously.
partitionAwarenessProvider.refreshAsync().exceptionally(refreshErr -> {
log.error("Failed to refresh schemas and partition assignment: " + refreshErr.getMessage(), refreshErr);
close(refreshErr);
return null;
});
}
});
} catch (Exception e) {
log.error("Failed to send batch to partition " + partition + ": " + e.getMessage(), e);
close(e);
return CompletableFuture.failedFuture(e);
}
}
private void close(@Nullable Throwable throwable) {
if (flushTimer != null) {
assert flushTask != null;
flushTask.cancel(false);
IgniteUtils.shutdownAndAwaitTermination(flushTimer, 10, TimeUnit.SECONDS);
}
var s = subscription;
if (s != null) {
s.cancel();
}
if (throwable == null) {
buffers.values().forEach(StreamerBuffer::flushAndClose);
var futs = pendingRequests.values().toArray(new CompletableFuture[0]);
CompletableFuture.allOf(futs).whenComplete(copyStateTo(completionFut));
} else {
completionFut.completeExceptionally(throwable);
}
}
private void requestMore() {
// This method controls backpressure. We won't get more items than we requested.
// The idea is to have perNodeParallelOperations batches in flight for every connection.
var pending = pendingItemCount.get();
var desiredInFlight = Math.max(1, buffers.size()) * options.batchSize() * options.perNodeParallelOperations();
var inFlight = inFlightItemCount.get();
var count = desiredInFlight - inFlight - pending;
if (count <= 0) {
return;
}
assert subscription != null;
subscription.request(count);
pendingItemCount.addAndGet(count);
}
private void initFlushTimer() {
int interval = options.autoFlushFrequency();
if (interval <= 0) {
return;
}
String threadPrefix = "client-data-streamer-flush-" + hashCode();
flushTimer = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadPrefix, log));
flushTask = flushTimer.scheduleAtFixedRate(this::flushBuffers, interval, interval, TimeUnit.MILLISECONDS);
}
private void flushBuffers() {
buffers.values().forEach(StreamerBuffer::flush);
}
private static StreamerMetricSink getMetrics(@Nullable StreamerMetricSink metrics) {
return metrics != null ? metrics : new StreamerMetricSink() {
@Override
public void streamerBatchesSentAdd(long batches) {
// No-op.
}
@Override
public void streamerItemsSentAdd(long items) {
// No-op.
}
@Override
public void streamerBatchesActiveAdd(long batches) {
// No-op.
}
@Override
public void streamerItemsQueuedAdd(long items) {
// No-op.
}
};
}
}