blob: bbbfa65211fb8551dc68f09279813bb2937e1e86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.protonj2.buffer.ProtonCompositeBuffer;
import org.apache.qpid.protonj2.client.StreamDelivery;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.engine.util.StringUtils;
import org.apache.qpid.protonj2.types.messaging.Accepted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link StreamDelivery} implementation that provides the mechanics of reading message
* types from an incoming split framed transfer.
*/
public final class ClientStreamDelivery extends ClientDeliverable<ClientStreamDelivery, ClientStreamReceiver> implements StreamDelivery {
private static final Logger LOG = LoggerFactory.getLogger(ClientStreamDelivery.class);
private final ClientStreamReceiver receiver;
private final IncomingDelivery protonDelivery;
private ClientStreamReceiverMessage message;
private RawDeliveryInputStream rawInputStream;
ClientStreamDelivery(ClientStreamReceiver receiver, IncomingDelivery protonDelivery) {
super(receiver, protonDelivery);
this.receiver = receiver;
this.protonDelivery = protonDelivery.setLinkedResource(this);
// Already fully received delivery could be settled now
autoAcceptDeliveryIfNecessary();
// Capture inbound events and route to an active stream or message
protonDelivery.deliveryReadHandler(this::handleDeliveryRead)
.deliveryAbortedHandler(this::handleDeliveryAborted);
}
@Override
protected ClientStreamDelivery self() {
return this;
}
@Override
public ClientStreamReceiver receiver() {
return receiver;
}
@Override
public boolean aborted() {
return protonDelivery.isAborted();
}
@Override
public boolean completed() {
return !protonDelivery.isPartial();
}
@Override
public ClientStreamReceiverMessage message() throws ClientException {
if (rawInputStream != null && message == null) {
throw new ClientIllegalStateException("Cannot access Delivery Message API after requesting an InputStream");
}
if (message == null) {
message = new ClientStreamReceiverMessage(receiver, this, rawInputStream = new RawDeliveryInputStream());
}
return message;
}
@Override
public Map<String, Object> annotations() throws ClientException {
if (rawInputStream != null && message == null) {
throw new ClientIllegalStateException("Cannot access Delivery Annotations API after requesting an InputStream");
}
return StringUtils.toStringKeyedMap(message().deliveryAnnotations() != null ? message().deliveryAnnotations().getValue() : null);
}
@Override
public InputStream rawInputStream() throws ClientException {
if (message != null) {
throw new ClientIllegalStateException("Cannot access Delivery InputStream API after requesting an Message");
}
if (rawInputStream == null) {
rawInputStream = new RawDeliveryInputStream();
}
return rawInputStream;
}
//----- Event Handlers for Delivery updates
void handleDeliveryRead(IncomingDelivery delivery) {
try {
if (rawInputStream != null) {
rawInputStream.handleDeliveryRead(delivery);
}
} finally {
autoAcceptDeliveryIfNecessary();
}
}
void handleDeliveryAborted(IncomingDelivery delivery) {
try {
if (rawInputStream != null) {
rawInputStream.handleDeliveryAborted(delivery);
}
} finally {
try {
receiver.disposition(delivery, null, true);
} catch (Exception error) {
}
}
}
void handleReceiverClosed(ClientStreamReceiver receiver) {
if (rawInputStream != null) {
rawInputStream.handleReceiverClosed(receiver);
}
}
//----- Private stream delivery API
private void autoAcceptDeliveryIfNecessary() {
if (receiver.receiverOptions().autoAccept() && !protonDelivery.isSettled() && !protonDelivery.isPartial()) {
try {
receiver.disposition(protonDelivery, Accepted.getInstance(), receiver.receiverOptions().autoSettle());
} catch (Exception error) {
LOG.trace("Caught error while attempting to auto accept the fully read delivery.", error);
}
}
}
//----- Raw InputStream Implementation
private class RawDeliveryInputStream extends InputStream {
private final int INVALID_MARK = -1;
private final ProtonCompositeBuffer buffer = new ProtonCompositeBuffer();
private final ScheduledExecutorService executor = receiver.session().getScheduler();
private ClientFuture<Integer> readRequest;
private AtomicBoolean closed = new AtomicBoolean();
private int markIndex = INVALID_MARK;
private int markLimit;
@Override
public void close() throws IOException {
markLimit = 0;
markIndex = INVALID_MARK;
if (closed.compareAndSet(false, true)) {
final ClientFuture<Void> closed = receiver.session().getFutureFactory().createFuture();
try {
executor.execute(() -> {
// If the deliver wasn't fully read either because there are remaining
// bytes locally we need to discard those to aid in retention avoidance.
// and to potentially open the session window to allow for fully reading
// and discarding any inbound bytes that remain.
try {
protonDelivery.readAll();
} catch (EngineFailedException efe) {
// Ignore as engine is down and we cannot read any more
}
// Clear anything that wasn't yet read and then clear any pending read request as EOF
buffer.setIndex(buffer.capacity(), buffer.capacity());
buffer.reclaimRead();
if (readRequest != null) {
readRequest.complete(-1);
readRequest = null;
}
closed.complete(null);
});
receiver.session().request(receiver, closed);
} catch (Exception error) {
LOG.debug("Ignoring error on RawInputStream close: ", error);
} finally {
super.close();
}
}
}
@Override
public boolean markSupported() {
return true;
}
@Override
public synchronized void mark(int readlimit) {
markIndex = buffer.getReadIndex();
markLimit = readlimit;
}
@Override
public synchronized void reset() throws IOException {
if (markIndex != INVALID_MARK) {
buffer.setReadIndex(markIndex);
markIndex = INVALID_MARK;
markLimit = 0;
}
}
@Override
public int available() throws IOException {
checkStreamStateIsValid();
// Check for any bytes in the delivery that haven't been moved to the read buffer yet
if (buffer.isReadable()) {
return buffer.getReadableBytes();
} else {
final ClientFuture<Integer> request = receiver.session().getFutureFactory().createFuture();
try {
executor.execute(() -> {
if (protonDelivery.available() > 0) {
buffer.append(protonDelivery.readAll());
}
request.complete(buffer.getReadableBytes());
});
return receiver.session().request(receiver, request);
} catch (Exception e) {
throw new IOException("Error reading requested data", e);
}
}
}
@Override
public int read() throws IOException {
checkStreamStateIsValid();
int result = -1;
while (true) {
if (buffer.isReadable()) {
result = buffer.readByte() & 0xff;
tryReleaseReadBuffers();
break;
} else if (requestMoreData() < 0) {
break;
}
}
return result;
}
@Override
public int read(byte target[], int offset, int length) throws IOException {
checkStreamStateIsValid();
Objects.checkFromIndexSize(offset, length, target.length);
int remaining = length;
int bytesRead = 0;
if (length <= 0) {
return 0;
}
while (remaining > 0) {
if (buffer.isReadable()) {
if (buffer.getReadableBytes() < remaining) {
final int readTarget = buffer.getReadableBytes();
buffer.readBytes(target, offset + bytesRead, buffer.getReadableBytes());
bytesRead += readTarget;
remaining -= readTarget;
} else {
buffer.readBytes(target, offset + bytesRead, remaining);
bytesRead += remaining;
remaining = 0;
}
tryReleaseReadBuffers();
} else if (requestMoreData() < 0) {
return bytesRead > 0 ? bytesRead : -1;
}
}
return bytesRead;
}
@Override
public long skip(long amount) throws IOException {
checkStreamStateIsValid();
long remaining = amount;
if (amount <= 0) {
return 0;
}
while (remaining > 0) {
if (buffer.isReadable()) {
if (buffer.getReadableBytes() < remaining) {
remaining -= buffer.getReadableBytes();
buffer.skipBytes(buffer.getReadableBytes());
} else {
buffer.skipBytes((int) remaining);
remaining = 0;
}
tryReleaseReadBuffers();
} else if (requestMoreData() < 0) {
break;
}
}
return amount - remaining;
}
@Override
public long transferTo(OutputStream target) throws IOException {
checkStreamStateIsValid();
// TODO: Implement efficient read and forward without intermediate copies
// from the currently available buffer to the output stream.
return super.transferTo(target);
}
private void tryReleaseReadBuffers() {
if (buffer.getReadIndex() - markIndex > markLimit) {
markIndex = INVALID_MARK;
markLimit = 0;
buffer.reclaimRead();
}
}
private void handleDeliveryRead(IncomingDelivery delivery) {
if (closed.get()) {
// Clear any pending data to expand session window if not yet complete
delivery.readAll();
} else {
// An input stream is awaiting some more incoming bytes, check to see if
// the delivery had a non-empty transfer frame and provide them.
if (readRequest != null) {
if (delivery.available() > 0) {
buffer.append(protonDelivery.readAll());
readRequest.complete(buffer.getReadableBytes());
} else if (!delivery.isPartial()) {
readRequest.complete(-1);
}
readRequest = null;
}
}
}
private void handleDeliveryAborted(IncomingDelivery delivery) {
if (readRequest != null) {
readRequest.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
}
}
private void handleReceiverClosed(ClientStreamReceiver receiver) {
if (readRequest != null) {
readRequest.failed(new ClientResourceRemotelyClosedException("The receiver link has been remotely closed."));
}
}
private int requestMoreData() throws IOException {
final ClientFuture<Integer> request = receiver.session().getFutureFactory().createFuture();
try {
executor.execute(() -> {
if (protonDelivery.getLink().isLocallyClosedOrDetached()) {
request.failed(new ClientException("Cannot read from delivery due to link having been closed"));
} else if (protonDelivery.available() > 0) {
buffer.append(protonDelivery.readAll());
request.complete(buffer.getReadableBytes());
} else if (protonDelivery.isAborted()) {
request.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
} else if (!protonDelivery.isPartial()) {
request.complete(-1);
} else {
readRequest = request;
}
});
return receiver.session().request(receiver, request);
} catch (Exception e) {
throw new IOException("Error reading requested data", e);
}
}
private void checkStreamStateIsValid() throws IOException {
if (closed.get()) {
throw new IOException("The InputStream has been explicitly closed");
}
if (receiver.isClosed()) {
throw new IOException("Underlying receiver has closed", receiver.getFailureCause());
}
}
}
}