blob: e0a2827eba57c496d42332bd82e396ad3ae1869e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.tuple;
import java.util.Collections;
import java.util.List;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.GeneralTopologyContext;
public class TupleImpl implements Tuple {
private final String srcComponent;
private List<Object> values;
private int taskId;
private String streamId;
private GeneralTopologyContext context;
private MessageId id;
private Long processSampleStartTime;
private Long executeSampleStartTime;
private long outAckVal = 0;
public TupleImpl(Tuple t) {
this.values = t.getValues();
this.taskId = t.getSourceTask();
this.streamId = t.getSourceStreamId();
this.id = t.getMessageId();
this.context = t.getContext();
this.srcComponent = t.getSourceComponent();
try {
TupleImpl ti = (TupleImpl) t;
this.processSampleStartTime = ti.processSampleStartTime;
this.executeSampleStartTime = ti.executeSampleStartTime;
this.outAckVal = ti.outAckVal;
} catch (ClassCastException e) {
// ignore ... if t is not a TupleImpl type .. faster than checking and then casting
}
}
public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
this.values = context.doSanityCheck() ? Collections.unmodifiableList(values) : values;
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
this.srcComponent = srcComponent;
if (context.doSanityCheck()) {
String componentId = context.getComponentId(taskId);
Fields schema = context.getComponentOutputFields(componentId, streamId);
if (values.size() != schema.size()) {
throw new IllegalArgumentException("Tuple created with wrong number of fields. Expected " + schema.size()
+ " fields but got " + values.size() + " fields");
}
}
}
public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId) {
this(context, values, srcComponent, taskId, streamId, MessageId.makeUnanchored());
}
public Long getProcessSampleStartTime() {
return processSampleStartTime;
}
public void setProcessSampleStartTime(long ms) {
processSampleStartTime = ms;
}
public Long getExecuteSampleStartTime() {
return executeSampleStartTime;
}
public void setExecuteSampleStartTime(long ms) {
executeSampleStartTime = ms;
}
public void updateAckVal(long val) {
outAckVal = outAckVal ^ val;
}
public long getAckVal() {
return outAckVal;
}
@Override
public int size() {
return values.size();
}
@Override
public int fieldIndex(String field) {
return getFields().fieldIndex(field);
}
@Override
public boolean contains(String field) {
return getFields().contains(field);
}
@Override
public Object getValue(int i) {
return values.get(i);
}
@Override
public String getString(int i) {
return (String) values.get(i);
}
@Override
public Integer getInteger(int i) {
return (Integer) values.get(i);
}
@Override
public Long getLong(int i) {
return (Long) values.get(i);
}
@Override
public Boolean getBoolean(int i) {
return (Boolean) values.get(i);
}
@Override
public Short getShort(int i) {
return (Short) values.get(i);
}
@Override
public Byte getByte(int i) {
return (Byte) values.get(i);
}
@Override
public Double getDouble(int i) {
return (Double) values.get(i);
}
@Override
public Float getFloat(int i) {
return (Float) values.get(i);
}
@Override
public byte[] getBinary(int i) {
return (byte[]) values.get(i);
}
@Override
public Object getValueByField(String field) {
return values.get(fieldIndex(field));
}
@Override
public String getStringByField(String field) {
return (String) values.get(fieldIndex(field));
}
@Override
public Integer getIntegerByField(String field) {
return (Integer) values.get(fieldIndex(field));
}
@Override
public Long getLongByField(String field) {
return (Long) values.get(fieldIndex(field));
}
@Override
public Boolean getBooleanByField(String field) {
return (Boolean) values.get(fieldIndex(field));
}
@Override
public Short getShortByField(String field) {
return (Short) values.get(fieldIndex(field));
}
@Override
public Byte getByteByField(String field) {
return (Byte) values.get(fieldIndex(field));
}
@Override
public Double getDoubleByField(String field) {
return (Double) values.get(fieldIndex(field));
}
@Override
public Float getFloatByField(String field) {
return (Float) values.get(fieldIndex(field));
}
@Override
public byte[] getBinaryByField(String field) {
return (byte[]) values.get(fieldIndex(field));
}
@Override
public List<Object> getValues() {
return values;
}
@Override
public Fields getFields() {
return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
}
@Override
public List<Object> select(Fields selector) {
return getFields().select(selector, values);
}
@Override
public GlobalStreamId getSourceGlobalStreamId() {
return new GlobalStreamId(getSourceComponent(), streamId);
}
@Override
public String getSourceComponent() {
return srcComponent;
}
@Override
public int getSourceTask() {
return taskId;
}
@Override
public String getSourceStreamId() {
return streamId;
}
@Override
public MessageId getMessageId() {
return id;
}
@Override
public GeneralTopologyContext getContext() {
return context;
}
@Override
public String toString() {
return "source: " + getSourceComponent() + ":" + taskId
+ ", stream: " + streamId
+ ", id: " + id.toString()
+ ", " + values.toString() + " PROC_START_TIME(sampled): "
+ processSampleStartTime + " EXEC_START_TIME(sampled): " + executeSampleStartTime;
}
@Override
public boolean equals(Object other) {
return this == other;
}
@Override
public int hashCode() {
return System.identityHashCode(this);
}
}