| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.coders; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; |
| |
| /** |
| * A {@code DelegateCoder<T, IntermediateT>} wraps a {@link Coder} for {@code IntermediateT} and |
| * encodes/decodes values of type {@code T} by converting to/from {@code IntermediateT} and then |
| * encoding/decoding using the underlying {@code Coder<IntermediateT>}. |
| * |
| * <p>The conversions from {@code T} to {@code IntermediateT} and vice versa must be supplied as |
| * {@link CodingFunction}, a serializable function that may throw any {@code Exception}. If a thrown |
| * exception is an instance of {@link CoderException} or {@link IOException}, it will be re-thrown, |
| * otherwise it will be wrapped as a {@link CoderException}. |
| * |
| * @param <T> The type of objects coded by this Coder. |
| * @param <IntermediateT> The type of objects a {@code T} will be converted to for coding. |
| */ |
| public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> { |
| /** |
| * A {@link DelegateCoder.CodingFunction CodingFunction<InputT, OutputT>} is a serializable |
| * function from {@code InputT} to {@code OutputT} that may throw any {@link Exception}. |
| */ |
| public interface CodingFunction<InputT, OutputT> extends Serializable { |
| OutputT apply(InputT input) throws Exception; |
| } |
| |
| public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of( |
| Coder<IntermediateT> coder, |
| CodingFunction<T, IntermediateT> toFn, |
| CodingFunction<IntermediateT, T> fromFn) { |
| return of(coder, toFn, fromFn, null); |
| } |
| |
| public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of( |
| Coder<IntermediateT> coder, |
| CodingFunction<T, IntermediateT> toFn, |
| CodingFunction<IntermediateT, T> fromFn, |
| @Nullable TypeDescriptor<T> typeDescriptor) { |
| return new DelegateCoder<>(coder, toFn, fromFn, typeDescriptor); |
| } |
| |
| @Override |
| public void encode(T value, OutputStream outStream) throws CoderException, IOException { |
| encode(value, outStream, Context.NESTED); |
| } |
| |
| @Override |
| public void encode(T value, OutputStream outStream, Context context) |
| throws CoderException, IOException { |
| coder.encode(applyAndWrapExceptions(toFn, value), outStream, context); |
| } |
| |
| @Override |
| public T decode(InputStream inStream) throws CoderException, IOException { |
| return decode(inStream, Context.NESTED); |
| } |
| |
| @Override |
| public T decode(InputStream inStream, Context context) throws CoderException, IOException { |
| return applyAndWrapExceptions(fromFn, coder.decode(inStream, context)); |
| } |
| |
| /** |
| * Returns the coder used to encode/decode the intermediate values produced/consumed by the coding |
| * functions of this {@code DelegateCoder}. |
| */ |
| public Coder<IntermediateT> getCoder() { |
| return coder; |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @throws NonDeterministicException when the underlying coder's {@code verifyDeterministic()} |
| * throws a {@link Coder.NonDeterministicException}. For this to be safe, the intermediate |
| * {@code CodingFunction<T, IntermediateT>} must also be deterministic. |
| */ |
| @Override |
| public void verifyDeterministic() throws NonDeterministicException { |
| coder.verifyDeterministic(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @return a structural for a value of type {@code T} obtained by first converting to {@code |
| * IntermediateT} and then obtaining a structural value according to the underlying coder. |
| */ |
| @Override |
| public Object structuralValue(T value) { |
| try { |
| IntermediateT intermediate = toFn.apply(value); |
| return coder.structuralValue(intermediate); |
| } catch (Exception exn) { |
| throw new IllegalArgumentException( |
| "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o == null || this.getClass() != o.getClass()) { |
| return false; |
| } |
| DelegateCoder<?, ?> that = (DelegateCoder<?, ?>) o; |
| return Objects.equal(this.coder, that.coder) |
| && Objects.equal(this.toFn, that.toFn) |
| && Objects.equal(this.fromFn, that.fromFn); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(this.coder, this.toFn, this.fromFn); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(getClass()) |
| .add("coder", coder) |
| .add("toFn", toFn) |
| .add("fromFn", fromFn) |
| .toString(); |
| } |
| |
| @Override |
| public TypeDescriptor<T> getEncodedTypeDescriptor() { |
| if (typeDescriptor == null) { |
| return super.getEncodedTypeDescriptor(); |
| } |
| return typeDescriptor; |
| } |
| |
| private String delegateEncodingId(Class<?> delegateClass, String encodingId) { |
| return String.format("%s:%s", delegateClass.getName(), encodingId); |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| private <InputT, OutputT> OutputT applyAndWrapExceptions( |
| CodingFunction<InputT, OutputT> fn, InputT input) throws CoderException, IOException { |
| try { |
| return fn.apply(input); |
| } catch (IOException exc) { |
| throw exc; |
| } catch (Exception exc) { |
| throw new CoderException(exc); |
| } |
| } |
| |
| private final Coder<IntermediateT> coder; |
| private final CodingFunction<T, IntermediateT> toFn; |
| private final CodingFunction<IntermediateT, T> fromFn; |
| |
| // null unless the user explicitly provides a TypeDescriptor. |
| // If null, then the machinery from the superclass (StructuredCoder) will be used |
| // to try to deduce a good type descriptor. |
| @Nullable private final TypeDescriptor<T> typeDescriptor; |
| |
| protected DelegateCoder( |
| Coder<IntermediateT> coder, |
| CodingFunction<T, IntermediateT> toFn, |
| CodingFunction<IntermediateT, T> fromFn, |
| @Nullable TypeDescriptor<T> typeDescriptor) { |
| this.coder = coder; |
| this.fromFn = fromFn; |
| this.toFn = toFn; |
| this.typeDescriptor = typeDescriptor; |
| } |
| } |