blob: 1ba5537a8c1397bbe0af5563f655098728b18854 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.coders;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.CountingOutputStream;
/**
* A {@link Coder Coder<T>} defines how to encode and decode values of type {@code T} into
* byte streams.
*
* <p>{@link Coder} instances are serialized during job creation and deserialized before use. This
* will generally be performed by serializing the object via Java Serialization.
*
* <p>{@link Coder} classes for compound types are often composed from coder classes for types
* contains therein. The composition of {@link Coder} instances into a coder for the compound class
* is the subject of the {@link CoderProvider} type, which enables automatic generic composition of
* {@link Coder} classes within the {@link CoderRegistry}. See {@link CoderProvider} and {@link
* CoderRegistry} for more information about how coders are inferred.
*
* <p>All methods of a {@link Coder} are required to be thread safe.
*
* @param <T> the type of values being encoded and decoded
*/
public abstract class Coder<T> implements Serializable {
/**
* The context in which encoding or decoding is being done.
*
* @deprecated To implement a coder, do not use any {@link Context}. Just implement only those
* abstract methods which do not accept a {@link Context} and leave the default
* implementations for methods accepting a {@link Context}.
*/
@Deprecated
@Experimental(Kind.CODER_CONTEXT)
public static class Context {
/**
* The outer context: the value being encoded or decoded takes up the remainder of the
* record/stream contents.
*/
public static final Context OUTER = new Context(true);
/**
* The nested context: the value being encoded or decoded is (potentially) a part of a larger
* record/stream contents, and may have other parts encoded or decoded after it.
*/
public static final Context NESTED = new Context(false);
/**
* Whether the encoded or decoded value fills the remainder of the output or input (resp.)
* record/stream contents. If so, then the size of the decoded value can be determined from the
* remaining size of the record/stream contents, and so explicit lengths aren't required.
*/
public final boolean isWholeStream;
public Context(boolean isWholeStream) {
this.isWholeStream = isWholeStream;
}
public Context nested() {
return NESTED;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Context)) {
return false;
}
return Objects.equal(isWholeStream, ((Context) obj).isWholeStream);
}
@Override
public int hashCode() {
return Objects.hashCode(isWholeStream);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(Context.class)
.addValue(isWholeStream ? "OUTER" : "NESTED")
.toString();
}
}
/**
* Encodes the given value of type {@code T} onto the given output stream.
*
* @throws IOException if writing to the {@code OutputStream} fails for some reason
* @throws CoderException if the value could not be encoded for some reason
*/
public abstract void encode(T value, OutputStream outStream) throws CoderException, IOException;
/**
* Encodes the given value of type {@code T} onto the given output stream in the given context.
*
* @throws IOException if writing to the {@code OutputStream} fails for some reason
* @throws CoderException if the value could not be encoded for some reason
* @deprecated only implement and call {@link #encode(Object value, OutputStream)}
*/
@Deprecated
@Experimental(Kind.CODER_CONTEXT)
public void encode(T value, OutputStream outStream, Context context)
throws CoderException, IOException {
encode(value, outStream);
}
/**
* Decodes a value of type {@code T} from the given input stream in the given context. Returns the
* decoded value.
*
* @throws IOException if reading from the {@code InputStream} fails for some reason
* @throws CoderException if the value could not be decoded for some reason
*/
public abstract T decode(InputStream inStream) throws CoderException, IOException;
/**
* Decodes a value of type {@code T} from the given input stream in the given context. Returns the
* decoded value.
*
* @throws IOException if reading from the {@code InputStream} fails for some reason
* @throws CoderException if the value could not be decoded for some reason
* @deprecated only implement and call {@link #decode(InputStream)}
*/
@Deprecated
@Experimental(Kind.CODER_CONTEXT)
public T decode(InputStream inStream, Context context) throws CoderException, IOException {
return decode(inStream);
}
/**
* If this is a {@link Coder} for a parameterized type, returns the list of {@link Coder}s being
* used for each of the parameters in the same order they appear within the parameterized type's
* type signature. If this cannot be done, or this {@link Coder} does not encode/decode a
* parameterized type, returns the empty list.
*/
public abstract List<? extends Coder<?>> getCoderArguments();
/**
* Throw {@link NonDeterministicException} if the coding is not deterministic.
*
* <p>In order for a {@code Coder} to be considered deterministic, the following must be true:
*
* <ul>
* <li>two values that compare as equal (via {@code Object.equals()} or {@code
* Comparable.compareTo()}, if supported) have the same encoding.
* <li>the {@code Coder} always produces a canonical encoding, which is the same for an instance
* of an object even if produced on different computers at different times.
* </ul>
*
* @throws Coder.NonDeterministicException if this coder is not deterministic.
*/
public abstract void verifyDeterministic() throws Coder.NonDeterministicException;
/**
* Verifies all of the provided coders are deterministic. If any are not, throws a {@link
* NonDeterministicException} for the {@code target} {@link Coder}.
*/
public static void verifyDeterministic(Coder<?> target, String message, Iterable<Coder<?>> coders)
throws NonDeterministicException {
for (Coder<?> coder : coders) {
try {
coder.verifyDeterministic();
} catch (NonDeterministicException e) {
throw new NonDeterministicException(target, message, e);
}
}
}
/**
* Verifies all of the provided coders are deterministic. If any are not, throws a {@link
* NonDeterministicException} for the {@code target} {@link Coder}.
*/
public static void verifyDeterministic(Coder<?> target, String message, Coder<?>... coders)
throws NonDeterministicException {
verifyDeterministic(target, message, Arrays.asList(coders));
}
/**
* Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}.
*
* <p>Whenever the encoded bytes of two values are equal, then the original values are equal
* according to {@code Objects.equals()}. Note that this is well-defined for {@code null}.
*
* <p>This condition is most notably false for arrays. More generally, this condition is false
* whenever {@code equals()} compares object identity, rather than performing a
* semantic/structural comparison.
*
* <p>By default, returns false.
*/
public boolean consistentWithEquals() {
return false;
}
/**
* Returns an object with an {@code Object.equals()} method that represents structural equality on
* the argument.
*
* <p>For any two values {@code x} and {@code y} of type {@code T}, if their encoded bytes are the
* same, then it must be the case that {@code structuralValue(x).equals(structuralValue(y))}.
*
* <p>Most notably:
*
* <ul>
* <li>The structural value for an array coder should perform a structural comparison of the
* contents of the arrays, rather than the default behavior of comparing according to object
* identity.
* <li>The structural value for a coder accepting {@code null} should be a proper object with an
* {@code equals()} method, even if the input value is {@code null}.
* </ul>
*
* <p>See also {@link #consistentWithEquals()}.
*
* <p>By default, if this coder is {@link #consistentWithEquals()}, and the value is not null,
* returns the provided object. Otherwise, encodes the value into a {@code byte[]}, and returns an
* object that performs array equality on the encoded bytes.
*/
public Object structuralValue(T value) {
if (value != null && consistentWithEquals()) {
return value;
} else {
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
encode(value, os, Context.OUTER);
return new StructuralByteArray(os.toByteArray());
} catch (Exception exn) {
throw new IllegalArgumentException(
"Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
}
}
}
/**
* Returns whether {@link #registerByteSizeObserver} cheap enough to call for every element, that
* is, if this {@code Coder} can calculate the byte size of the element to be coded in roughly
* constant time (or lazily).
*
* <p>Not intended to be called by user code, but instead by {@link PipelineRunner}
* implementations.
*
* <p>By default, returns false. The default {@link #registerByteSizeObserver} implementation
* invokes {@link #getEncodedElementByteSize} which requires re-encoding an element unless it is
* overridden. This is considered expensive.
*/
public boolean isRegisterByteSizeObserverCheap(T value) {
return false;
}
/**
* Notifies the {@code ElementByteSizeObserver} about the byte size of the encoded value using
* this {@code Coder}.
*
* <p>Not intended to be called by user code, but instead by {@link PipelineRunner}
* implementations.
*
* <p>By default, this notifies {@code observer} about the byte size of the encoded value using
* this coder as returned by {@link #getEncodedElementByteSize}.
*/
public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) throws Exception {
observer.update(getEncodedElementByteSize(value));
}
/** Returns the size in bytes of the encoded value using this coder. */
protected long getEncodedElementByteSize(T value) throws Exception {
try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
encode(value, os);
return os.getCount();
} catch (Exception exn) {
throw new IllegalArgumentException(
"Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
}
}
/** Returns the {@link TypeDescriptor} for the type encoded. */
@Experimental(Kind.CODER_TYPE_ENCODING)
public TypeDescriptor<T> getEncodedTypeDescriptor() {
return (TypeDescriptor<T>)
TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
}
/**
* Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is not deterministic,
* including details of why the encoding is not deterministic.
*/
public static class NonDeterministicException extends Exception {
private Coder<?> coder;
private List<String> reasons;
public NonDeterministicException(
Coder<?> coder, String reason, @Nullable NonDeterministicException e) {
this(coder, Arrays.asList(reason), e);
}
public NonDeterministicException(Coder<?> coder, String reason) {
this(coder, Arrays.asList(reason), null);
}
public NonDeterministicException(Coder<?> coder, List<String> reasons) {
this(coder, reasons, null);
}
public NonDeterministicException(
Coder<?> coder, List<String> reasons, @Nullable NonDeterministicException cause) {
super(cause);
checkArgument(reasons.size() > 0, "Reasons must not be empty.");
this.reasons = reasons;
this.coder = coder;
}
public Iterable<String> getReasons() {
return reasons;
}
@Override
public String getMessage() {
String reasonsStr = Joiner.on("\n\t").join(reasons);
return coder + " is not deterministic because:\n\t" + reasonsStr;
}
}
}