blob: 6ad83439953926da810910257706eee8484fc88b [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.common.utils.tuple;
import java.util.List;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.proto.system.HeronTuples;
/**
* The tuple is the main data structure in Heron. A tuple is a named list of values,
* where each value can be any type. Tuples are dynamically typed -- the types of the fields
* do not need to be declared. Tuples have helper methods like getInteger and getString
* to get field values without having to cast the result.
* <p>
* Heron needs to know how to serialize all the values in a tuple. By default, Heron
* knows how to serialize the primitive types, strings, and byte arrays. If you want to
* use another type, you'll need to implement and register a serializer for that type.
*
* @see <a href="https://storm.apache.org/documentation/Serialization.html">Storm serialization</a>
*/
public class TupleImpl implements Tuple {
private final TopologyContext context;
private final TopologyAPI.StreamId stream;
private final long tupleKey;
private final List<HeronTuples.RootId> roots;
private final long creationTime;
private final int sourceTaskId;
private List<Object> values;
public TupleImpl(TopologyContext context, TopologyAPI.StreamId stream,
long tupleKey, List<HeronTuples.RootId> roots,
List<Object> values, int sourceTaskId) {
this(context, stream, tupleKey, roots, values, System.nanoTime(), true, sourceTaskId);
}
public TupleImpl(TopologyContext context, TopologyAPI.StreamId stream,
long tupleKey, List<HeronTuples.RootId> roots,
List<Object> values, long creationTime, boolean isCheckRequired,
int sourceTaskId) {
this.context = context;
this.stream = stream;
this.tupleKey = tupleKey;
this.roots = roots;
this.values = values;
this.creationTime = creationTime;
this.sourceTaskId = sourceTaskId;
if (isCheckRequired) {
Fields schema = context.getComponentOutputFields(stream.getComponentName(),
stream.getId());
if (values.size() != schema.size()) {
throw new IllegalArgumentException(
"Tuple created with wrong number of fields. "
+ "Expected " + schema.size() + " fields but got "
+ values.size() + " fields"
);
}
}
}
public List<HeronTuples.RootId> getRoots() {
return roots;
}
public long getTupleKey() {
return tupleKey;
}
@Override
public int size() {
return values.size();
}
@Override
public int fieldIndex(String field) {
return getFields().fieldIndex(field);
}
@Override
public boolean contains(String field) {
return getFields().contains(field);
}
@Override
public Object getValue(int i) {
return values.get(i);
}
@Override
public String getString(int i) {
return (String) values.get(i);
}
@Override
public Integer getInteger(int i) {
return (Integer) values.get(i);
}
@Override
public Long getLong(int i) {
return (Long) values.get(i);
}
@Override
public Boolean getBoolean(int i) {
return (Boolean) values.get(i);
}
@Override
public Short getShort(int i) {
return (Short) values.get(i);
}
@Override
public Byte getByte(int i) {
return (Byte) values.get(i);
}
@Override
public Double getDouble(int i) {
return (Double) values.get(i);
}
@Override
public Float getFloat(int i) {
return (Float) values.get(i);
}
@Override
public byte[] getBinary(int i) {
return (byte[]) values.get(i);
}
@Override
public Object getValueByField(String field) {
return values.get(fieldIndex(field));
}
@Override
public String getStringByField(String field) {
return (String) values.get(fieldIndex(field));
}
@Override
public Integer getIntegerByField(String field) {
return (Integer) values.get(fieldIndex(field));
}
@Override
public Long getLongByField(String field) {
return (Long) values.get(fieldIndex(field));
}
@Override
public Boolean getBooleanByField(String field) {
return (Boolean) values.get(fieldIndex(field));
}
@Override
public Short getShortByField(String field) {
return (Short) values.get(fieldIndex(field));
}
@Override
public Byte getByteByField(String field) {
return (Byte) values.get(fieldIndex(field));
}
@Override
public Double getDoubleByField(String field) {
return (Double) values.get(fieldIndex(field));
}
@Override
public Float getFloatByField(String field) {
return (Float) values.get(fieldIndex(field));
}
@Override
public byte[] getBinaryByField(String field) {
return (byte[]) values.get(fieldIndex(field));
}
@Override
public List<Object> getValues() {
return values;
}
@Override
public Fields getFields() {
return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
}
@Override
public List<Object> select(Fields selector) {
return getFields().select(selector, values);
}
@Override
public TopologyAPI.StreamId getSourceGlobalStreamId() {
return stream;
}
@Override
public String getSourceComponent() {
return stream.getComponentName();
}
@Override
public int getSourceTask() {
return sourceTaskId;
}
@Override
public String getSourceStreamId() {
return stream.getId();
}
@Override
public String toString() {
return "source: " + getSourceComponent() + ", stream: " + getSourceStreamId()
+ ", " + values.toString();
}
@Override
public boolean equals(Object other) {
return this == other;
}
@Override
public int hashCode() {
return System.identityHashCode(this);
}
@Override
public void resetValues() {
values = null;
}
public long getCreationTime() {
return creationTime;
}
}