blob: 2abfdf1e2956f220e7fe5f4a6288da1e64fd1668 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.coders;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
/**
* A {@code DelegateCoder<T, IntermediateT>} wraps a {@link Coder} for {@code IntermediateT} and
* encodes/decodes values of type {@code T} by converting to/from {@code IntermediateT} and then
* encoding/decoding using the underlying {@code Coder<IntermediateT>}.
*
* <p>The conversions from {@code T} to {@code IntermediateT} and vice versa must be supplied as
* {@link CodingFunction}, a serializable function that may throw any {@code Exception}. If a thrown
* exception is an instance of {@link CoderException} or {@link IOException}, it will be re-thrown,
* otherwise it will be wrapped as a {@link CoderException}.
*
* @param <T> The type of objects coded by this Coder.
* @param <IntermediateT> The type of objects a {@code T} will be converted to for coding.
*/
public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
/**
* A {@link DelegateCoder.CodingFunction CodingFunction&lt;InputT, OutputT&gt;} is a serializable
* function from {@code InputT} to {@code OutputT} that may throw any {@link Exception}.
*/
public interface CodingFunction<InputT, OutputT> extends Serializable {
OutputT apply(InputT input) throws Exception;
}
public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of(
Coder<IntermediateT> coder,
CodingFunction<T, IntermediateT> toFn,
CodingFunction<IntermediateT, T> fromFn) {
return of(coder, toFn, fromFn, null);
}
public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of(
Coder<IntermediateT> coder,
CodingFunction<T, IntermediateT> toFn,
CodingFunction<IntermediateT, T> fromFn,
@Nullable TypeDescriptor<T> typeDescriptor) {
return new DelegateCoder<>(coder, toFn, fromFn, typeDescriptor);
}
@Override
public void encode(T value, OutputStream outStream) throws CoderException, IOException {
encode(value, outStream, Context.NESTED);
}
@Override
public void encode(T value, OutputStream outStream, Context context)
throws CoderException, IOException {
coder.encode(applyAndWrapExceptions(toFn, value), outStream, context);
}
@Override
public T decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Context.NESTED);
}
@Override
public T decode(InputStream inStream, Context context) throws CoderException, IOException {
return applyAndWrapExceptions(fromFn, coder.decode(inStream, context));
}
/**
* Returns the coder used to encode/decode the intermediate values produced/consumed by the coding
* functions of this {@code DelegateCoder}.
*/
public Coder<IntermediateT> getCoder() {
return coder;
}
/**
* {@inheritDoc}
*
* @throws NonDeterministicException when the underlying coder's {@code verifyDeterministic()}
* throws a {@link Coder.NonDeterministicException}. For this to be safe, the intermediate
* {@code CodingFunction<T, IntermediateT>} must also be deterministic.
*/
@Override
public void verifyDeterministic() throws NonDeterministicException {
coder.verifyDeterministic();
}
/**
* {@inheritDoc}
*
* @return a structural for a value of type {@code T} obtained by first converting to {@code
* IntermediateT} and then obtaining a structural value according to the underlying coder.
*/
@Override
public Object structuralValue(T value) {
try {
IntermediateT intermediate = toFn.apply(value);
return coder.structuralValue(intermediate);
} catch (Exception exn) {
throw new IllegalArgumentException(
"Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
}
}
@Override
public boolean equals(Object o) {
if (o == null || this.getClass() != o.getClass()) {
return false;
}
DelegateCoder<?, ?> that = (DelegateCoder<?, ?>) o;
return Objects.equal(this.coder, that.coder)
&& Objects.equal(this.toFn, that.toFn)
&& Objects.equal(this.fromFn, that.fromFn);
}
@Override
public int hashCode() {
return Objects.hashCode(this.coder, this.toFn, this.fromFn);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("coder", coder)
.add("toFn", toFn)
.add("fromFn", fromFn)
.toString();
}
@Override
public TypeDescriptor<T> getEncodedTypeDescriptor() {
if (typeDescriptor == null) {
return super.getEncodedTypeDescriptor();
}
return typeDescriptor;
}
private String delegateEncodingId(Class<?> delegateClass, String encodingId) {
return String.format("%s:%s", delegateClass.getName(), encodingId);
}
/////////////////////////////////////////////////////////////////////////////
private <InputT, OutputT> OutputT applyAndWrapExceptions(
CodingFunction<InputT, OutputT> fn, InputT input) throws CoderException, IOException {
try {
return fn.apply(input);
} catch (IOException exc) {
throw exc;
} catch (Exception exc) {
throw new CoderException(exc);
}
}
private final Coder<IntermediateT> coder;
private final CodingFunction<T, IntermediateT> toFn;
private final CodingFunction<IntermediateT, T> fromFn;
// null unless the user explicitly provides a TypeDescriptor.
// If null, then the machinery from the superclass (StructuredCoder) will be used
// to try to deduce a good type descriptor.
@Nullable private final TypeDescriptor<T> typeDescriptor;
protected DelegateCoder(
Coder<IntermediateT> coder,
CodingFunction<T, IntermediateT> toFn,
CodingFunction<IntermediateT, T> fromFn,
@Nullable TypeDescriptor<T> typeDescriptor) {
this.coder = coder;
this.fromFn = fromFn;
this.toFn = toFn;
this.typeDescriptor = typeDescriptor;
}
}