blob: 69be44ff7b998bde89f5065df710b08fe0f0471a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.apex.translation.utils;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import com.datatorrent.api.Operator;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StructuredCoder;
/**
* The common interface for all objects transmitted through streams.
*
* @param <T> The actual payload type.
*/
public interface ApexStreamTuple<T> {
/**
* Gets the value of the tuple.
*
* @return tuple
*/
T getValue();
/**
* Data tuple class.
*
* @param <T> tuple type
*/
class DataTuple<T> implements ApexStreamTuple<T> {
private int unionTag;
private T value;
public static <T> DataTuple<T> of(T value) {
return new DataTuple<>(value, 0);
}
private DataTuple(T value, int unionTag) {
this.value = value;
this.unionTag = unionTag;
}
@Override
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
public int getUnionTag() {
return unionTag;
}
public void setUnionTag(int unionTag) {
this.unionTag = unionTag;
}
@Override
public String toString() {
return value.toString();
}
}
/**
* Tuple that includes a timestamp.
*
* @param <T> tuple type
*/
class TimestampedTuple<T> extends DataTuple<T> {
private long timestamp;
public TimestampedTuple(long timestamp, T value) {
super(value, 0);
this.timestamp = timestamp;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
@Override
public int hashCode() {
return Objects.hash(timestamp);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof TimestampedTuple)) {
return false;
} else {
TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue());
}
}
}
/**
* Tuple that represents a watermark.
*
* @param <T> tuple type
*/
class WatermarkTuple<T> extends TimestampedTuple<T> {
public static <T> WatermarkTuple<T> of(long timestamp) {
return new WatermarkTuple<>(timestamp);
}
protected WatermarkTuple(long timestamp) {
super(timestamp, null);
}
@Override
public String toString() {
return "[Watermark " + getTimestamp() + "]";
}
}
/** Coder for {@link ApexStreamTuple}. */
class ApexStreamTupleCoder<T> extends StructuredCoder<ApexStreamTuple<T>> {
private static final long serialVersionUID = 1L;
final Coder<T> valueCoder;
public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) {
return new ApexStreamTupleCoder<>(valueCoder);
}
protected ApexStreamTupleCoder(Coder<T> valueCoder) {
this.valueCoder = checkNotNull(valueCoder);
}
@Override
public void encode(ApexStreamTuple<T> value, OutputStream outStream)
throws CoderException, IOException {
encode(value, outStream, Context.NESTED);
}
@Override
public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
throws CoderException, IOException {
if (value instanceof WatermarkTuple) {
outStream.write(1);
new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
} else {
outStream.write(0);
outStream.write(((DataTuple<?>) value).unionTag);
valueCoder.encode(value.getValue(), outStream, context);
}
}
@Override
public ApexStreamTuple<T> decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Context.NESTED);
}
@Override
public ApexStreamTuple<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
int b = inStream.read();
if (b == 1) {
return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
} else {
int unionTag = inStream.read();
return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
}
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Arrays.<Coder<?>>asList(valueCoder);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this,
this.getClass().getSimpleName() + " requires a deterministic valueCoder",
valueCoder);
}
/** Returns the value coder. */
public Coder<T> getValueCoder() {
return valueCoder;
}
}
/**
* Central if data tuples received on and emitted from ports should be logged. Should be called in
* setup and value cached in operator.
*/
final class Logging {
public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
return options.isTupleTracingEnabled();
}
}
}