| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.fn.stream; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.BlockingQueue; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString; |
| |
| /** |
| * {@link DataStreamDecoder} treats multiple {@link ByteString}s as a single input stream decoding |
| * values with the supplied iterator. {@link #outbound(OutputChunkConsumer)} treats a single {@link |
| * OutputStream} as multiple {@link ByteString}s. |
| */ |
| @SuppressWarnings({ |
| "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) |
| "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) |
| }) |
| public class DataStreams { |
| public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_000_000; |
| |
| /** |
| * Converts a single element delimited {@link OutputStream} into multiple {@link ByteString |
| * ByteStrings}. |
| * |
| * <p>Note that users must call {@link ElementDelimitedOutputStream#delimitElement} after each |
| * element. |
| * |
| * <p>Note that this {@link OutputStream} follows the Beam Fn API specification for forcing values |
| * that encode producing zero bytes to produce exactly one byte. |
| */ |
| public static ElementDelimitedOutputStream outbound(OutputChunkConsumer<ByteString> consumer) { |
| return outbound(consumer, DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES); |
| } |
| |
| /** |
| * Converts a single element delimited {@link OutputStream} into multiple {@link ByteString |
| * ByteStrings} using the specified maximum chunk size. |
| * |
| * <p>Note that users must call {@link ElementDelimitedOutputStream#delimitElement} after each |
| * element. |
| * |
| * <p>Note that this {@link OutputStream} follows the Beam Fn API specification for forcing values |
| * that encode producing zero bytes to produce exactly one byte. |
| */ |
| public static ElementDelimitedOutputStream outbound( |
| OutputChunkConsumer<ByteString> consumer, int maximumChunkSize) { |
| return new ElementDelimitedOutputStream(consumer, maximumChunkSize); |
| } |
| |
| /** |
| * An adapter which wraps an {@link OutputChunkConsumer} as an {@link OutputStream}. |
| * |
| * <p>Note that this adapter follows the Beam Fn API specification for forcing values that encode |
| * producing zero bytes to produce exactly one byte. |
| * |
| * <p>Note that users must invoke {@link #delimitElement} at each element boundary. |
| */ |
| public static final class ElementDelimitedOutputStream extends OutputStream { |
| private final OutputChunkConsumer<ByteString> consumer; |
| private final ByteString.Output output; |
| private final int maximumChunkSize; |
| int previousPosition; |
| |
| public ElementDelimitedOutputStream( |
| OutputChunkConsumer<ByteString> consumer, int maximumChunkSize) { |
| this.consumer = consumer; |
| this.maximumChunkSize = maximumChunkSize; |
| this.output = ByteString.newOutput(maximumChunkSize); |
| } |
| |
| public void delimitElement() throws IOException { |
| // If the previous encoding was exactly zero bytes, output a single marker byte as per |
| // https://s.apache.org/beam-fn-api-send-and-receive-data |
| if (previousPosition == output.size()) { |
| write(0); |
| } |
| previousPosition = output.size(); |
| } |
| |
| @Override |
| public void write(int i) throws IOException { |
| output.write(i); |
| if (maximumChunkSize == output.size()) { |
| internalFlush(); |
| } |
| } |
| |
| @Override |
| public void write(byte[] b, int offset, int length) throws IOException { |
| int spaceRemaining = maximumChunkSize - output.size(); |
| // Fill the first partially filled buffer. |
| if (length > spaceRemaining) { |
| output.write(b, offset, spaceRemaining); |
| offset += spaceRemaining; |
| length -= spaceRemaining; |
| internalFlush(); |
| } |
| // Fill buffers completely. |
| while (length > maximumChunkSize) { |
| output.write(b, offset, maximumChunkSize); |
| offset += maximumChunkSize; |
| length -= maximumChunkSize; |
| internalFlush(); |
| } |
| // Fill any remainder. |
| output.write(b, offset, length); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (output.size() > 0) { |
| consumer.read(output.toByteString()); |
| } |
| output.close(); |
| } |
| |
| /** Can only be called if at least one byte has been written. */ |
| private void internalFlush() throws IOException { |
| consumer.read(output.toByteString()); |
| output.reset(); |
| // Set the previous position to an invalid position representing that a previous buffer |
| // was written to. |
| previousPosition = -1; |
| } |
| } |
| |
| /** |
| * A callback which is invoked whenever the {@link #outbound} {@link OutputStream} becomes full. |
| * |
| * <p>{@link #outbound(OutputChunkConsumer)}. |
| */ |
| public interface OutputChunkConsumer<T> { |
| void read(T chunk) throws IOException; |
| } |
| |
| /** |
| * An adapter which converts an {@link InputStream} to a {@link PrefetchableIterator} of {@code T} |
| * values using the specified {@link Coder}. |
| * |
| * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode |
| * consuming zero bytes to consuming exactly one byte. |
| * |
| * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on |
| * first access to {@link #next}, {@link #hasNext}, {@link #isReady}, and {@link #prefetch}. |
| * |
| * <p>Note that {@link #isReady} and {@link #prefetch} rely on non-empty {@link ByteString}s being |
| * returned via the underlying {@link PrefetchableIterator} otherwise the {@link #prefetch} will |
| * seemingly make zero progress yet will actually advance through the empty pages. |
| */ |
| public static class DataStreamDecoder<T> implements PrefetchableIterator<T> { |
| |
| private enum State { |
| READ_REQUIRED, |
| HAS_NEXT, |
| EOF |
| } |
| |
| private final PrefetchableIterator<ByteString> inputByteStrings; |
| private final Inbound inbound; |
| private final Coder<T> coder; |
| private State currentState; |
| private T next; |
| |
| public DataStreamDecoder(Coder<T> coder, PrefetchableIterator<ByteString> inputStream) { |
| this.currentState = State.READ_REQUIRED; |
| this.coder = coder; |
| this.inputByteStrings = inputStream; |
| this.inbound = new Inbound(); |
| } |
| |
| @Override |
| public boolean isReady() { |
| switch (currentState) { |
| case EOF: |
| return true; |
| case READ_REQUIRED: |
| try { |
| return inbound.isReady(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| case HAS_NEXT: |
| return true; |
| default: |
| throw new IllegalStateException(String.format("Unknown state %s", currentState)); |
| } |
| } |
| |
| @Override |
| public void prefetch() { |
| if (!isReady()) { |
| inputByteStrings.prefetch(); |
| } |
| } |
| |
| @Override |
| public boolean hasNext() { |
| switch (currentState) { |
| case EOF: |
| return false; |
| case READ_REQUIRED: |
| try { |
| if (inbound.isEof()) { |
| currentState = State.EOF; |
| return false; |
| } |
| |
| long previousPosition = inbound.position; |
| next = coder.decode(inbound); |
| // Skip one byte if decoding the value consumed 0 bytes. |
| if (inbound.position - previousPosition == 0) { |
| checkState(inbound.read() != -1, "Unexpected EOF reached"); |
| } |
| currentState = State.HAS_NEXT; |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| return true; |
| case HAS_NEXT: |
| return true; |
| } |
| throw new IllegalStateException(String.format("Unknown state %s", currentState)); |
| } |
| |
| @Override |
| public T next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| currentState = State.READ_REQUIRED; |
| return next; |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| private static final InputStream EMPTY_STREAM = ByteString.EMPTY.newInput(); |
| |
| /** |
| * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the {@link |
| * Iterator} on first access of this input stream. |
| * |
| * <p>Closing this input stream has no effect. |
| */ |
| private class Inbound extends InputStream { |
| private long position; |
| private InputStream currentStream; |
| |
| public Inbound() { |
| this.currentStream = EMPTY_STREAM; |
| } |
| |
| public boolean isReady() throws IOException { |
| // Note that ByteString#newInput is guaranteed to return the length of the entire ByteString |
| // minus the number of bytes that have been read so far and can be reliably used to tell |
| // us whether we are at the end of the stream. |
| while (currentStream.available() == 0) { |
| if (!inputByteStrings.isReady()) { |
| return false; |
| } |
| if (!inputByteStrings.hasNext()) { |
| return true; |
| } |
| currentStream = inputByteStrings.next().newInput(); |
| } |
| return true; |
| } |
| |
| public boolean isEof() throws IOException { |
| // Note that ByteString#newInput is guaranteed to return the length of the entire ByteString |
| // minus the number of bytes that have been read so far and can be reliably used to tell |
| // us whether we are at the end of the stream. |
| while (currentStream.available() == 0) { |
| if (!inputByteStrings.hasNext()) { |
| return true; |
| } |
| currentStream = inputByteStrings.next().newInput(); |
| } |
| return false; |
| } |
| |
| @Override |
| public int read() throws IOException { |
| int read; |
| // Move on to the next stream if this stream is done |
| while ((read = currentStream.read()) == -1) { |
| if (!inputByteStrings.hasNext()) { |
| return -1; |
| } |
| currentStream = inputByteStrings.next().newInput(); |
| } |
| position += 1; |
| return read; |
| } |
| |
| @Override |
| public int read(byte[] b, int off, int len) throws IOException { |
| int remainingLen = len; |
| while (remainingLen > 0) { |
| int read; |
| // Move on to the next stream if this stream is done. Note that ByteString.newInput |
| // guarantees that read will consume the entire ByteString if the passed in length is |
| // greater than or equal to the remaining amount. |
| while ((read = currentStream.read(b, off + len - remainingLen, remainingLen)) == -1) { |
| if (!inputByteStrings.hasNext()) { |
| int bytesRead = len - remainingLen; |
| position += bytesRead; |
| return bytesRead > 0 ? bytesRead : -1; |
| } |
| currentStream = inputByteStrings.next().newInput(); |
| } |
| remainingLen -= read; |
| } |
| position += len; |
| return len; |
| } |
| } |
| } |
| |
| /** |
| * Allows for one or more writing threads to append values to this iterator while one reading |
| * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is |
| * available or this has been closed. |
| * |
| * <p>External synchronization must be provided if multiple readers would like to access the |
| * {@link Iterator#hasNext()} and {@link Iterator#next()} methods. |
| * |
| * <p>The order or values which are appended to this iterator is nondeterministic when multiple |
| * threads call {@link #accept(Object)}. |
| */ |
| public static class BlockingQueueIterator<T> implements AutoCloseable, Iterator<T> { |
| private static final Object POISION_PILL = new Object(); |
| private final BlockingQueue<T> queue; |
| |
| /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */ |
| private T currentElement; |
| |
| public BlockingQueueIterator(BlockingQueue<T> queue) { |
| this.queue = queue; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| queue.put((T) POISION_PILL); |
| } |
| |
| public void accept(T t) throws Exception { |
| queue.put(t); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| if (currentElement == null) { |
| try { |
| currentElement = queue.take(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IllegalStateException(e); |
| } |
| } |
| return currentElement != POISION_PILL; |
| } |
| |
| @Override |
| public T next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| T rval = currentElement; |
| currentElement = null; |
| return rval; |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |