blob: a6f04b5c9b6eeafcc722340071cc1b885b0eaf7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.security.SecureRandom;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.qpid.protonj2.client.NextReceiverPolicy;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
/**
* Implements the various strategies of selecting the next receiver with pending
* messages and or waiting for new incoming messages.
*/
final class ClientNextReceiverSelector {
private static final String LAST_RETURNED_STATE_KEY = "Last_Returned_State";
private final ArrayDeque<ClientFuture<Receiver>> pending = new ArrayDeque<>();
private final ClientSession session;
private SecureRandom srand;
public ClientNextReceiverSelector(ClientSession session) {
this.session = session;
// Same processing as reconnect for session delivery tapping
handleReconnect();
}
public void nextReceiver(ClientFuture<Receiver> request, NextReceiverPolicy policy, long timeout) {
Objects.requireNonNull(policy, "The next receiver selection policy cannot be null");
ClientReceiver result = null;
switch (policy) {
case ROUND_ROBIN:
result = selectNextAvailable();
break;
case FIRST_AVAILABLE:
result = selectFirstAvailable();
break;
case LARGEST_BACKLOG:
result = selectLargestBacklog();
break;
case SMALLEST_BACKLOG:
result = selectSmallestBacklog();
break;
case RANDOM:
result = selectRandomReceiver();
break;
default:
request.failed(new ClientException("Next receiver called with invalid or unknown policy:" + policy));
break;
}
if (result == null) {
pending.add(request); // Wait for the next incoming delivery
if (timeout > 0) {
session.getScheduler().schedule(() -> {
if (!request.isDone()) {
pending.remove(request);
request.complete(null);
}
}, timeout, TimeUnit.MILLISECONDS);
}
} else {
// Track last returned to update state for Round Robin next receiver dispatch
// this effectively ties all policies together in updating the next result from
// a call that requests the round robin fairness policy.
result.protonLink().getSession().getAttachments().set(LAST_RETURNED_STATE_KEY, result);
request.complete(result);
}
}
public void handleReconnect() {
session.getProtonSession().deliveryReadHandler(this::deliveryReadHandler);
}
public void handleShutdown() {
ClientException cause = null;
if (session.isClosed()) {
cause = new ClientIllegalStateException("The Session was explicitly closed", session.getFailureCause());
} else if (session.getFailureCause() != null) {
cause = session.getFailureCause();
} else {
cause = new ClientIllegalStateException("The session was closed without a specific error being provided");
}
for (ClientFuture<Receiver> request : pending) {
request.failed(cause);
}
pending.clear();
}
private ClientReceiver selectRandomReceiver() {
ArrayList<ClientReceiver> candidates = session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
.map((r) -> (ClientReceiver) r.getLinkedResource())
.collect(Collectors.toCollection(ArrayList::new));
if (srand == null) {
srand = new SecureRandom();
}
Collections.shuffle(candidates, srand);
return candidates.isEmpty() ? null : candidates.get(0);
}
private ClientReceiver selectNextAvailable() {
final ClientReceiver lastReceiver = session.getProtonSession().getAttachments().get(LAST_RETURNED_STATE_KEY);
ClientReceiver result = null;
if (lastReceiver != null && !lastReceiver.protonReceiver.isLocallyClosedOrDetached()) {
boolean foundLast = false;
for (org.apache.qpid.protonj2.engine.Receiver protonReceover : session.getProtonSession().receivers()) {
if (protonReceover.getLinkedResource() instanceof ClientReceiver) {
if (foundLast) {
ClientReceiver candidate = protonReceover.getLinkedResource();
if (candidate.queuedDeliveries() > 0) {
result = candidate;
}
} else {
foundLast = protonReceover.getLinkedResource() == lastReceiver;
}
}
}
} else {
session.getProtonSession().getAttachments().set(LAST_RETURNED_STATE_KEY, null);
}
return result != null ? result : selectFirstAvailable();
}
private ClientReceiver selectFirstAvailable() {
return session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
.map((r) -> (ClientReceiver) r.getLinkedResource())
.findFirst()
.orElse(null);
}
private ClientReceiver selectLargestBacklog() {
return session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
.map((r) -> (ClientReceiver) r.getLinkedResource())
.max(Comparator.comparingLong(ClientReceiver::queuedDeliveries))
.orElse(null);
}
private ClientReceiver selectSmallestBacklog() {
return session.getProtonSession().receivers().stream()
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
.map((r) -> (ClientReceiver) r.getLinkedResource())
.min(Comparator.comparingLong(ClientReceiver::queuedDeliveries))
.orElse(null);
}
private void deliveryReadHandler(IncomingDelivery delivery) {
// When a new delivery arrives that is completed
if (!pending.isEmpty() && !delivery.isPartial() && !delivery.isAborted()) {
// We only handle next receiver events for normal client receivers and
// not for stream receiver types etc.
if (delivery.getLink().getLinkedResource() instanceof ClientReceiver) {
ClientReceiver receiver = delivery.getLink().getLinkedResource();
// Track last returned to update state for Round Robin next receiver dispatch
delivery.getLink().getSession().getAttachments().set(LAST_RETURNED_STATE_KEY, receiver);
pending.poll().complete(receiver);
}
}
}
}