blob: 3c982bcc1c0a216e77e2e4625a9563dfbb6057f3 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.instance;
import java.util.logging.Logger;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.ByteAmount;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.proto.system.HeronTuples;
/**
* Implements OutgoingTupleCollection will be able to handle some basic methods for send out tuples
* 1. initNewControlTuple or initNewDataTuple
* 2. addDataTuple, addAckTuple and addFailTuple
* 3. flushRemaining tuples and sent out the tuples
* <p>
* In fact, when talking about to send out tuples, we mean we push them to the out queues.
*/
public class OutgoingTupleCollection {
private static final Logger LOG = Logger.getLogger(OutgoingTupleCollection.class.getName());
protected final String componentName;
// We have just one outQueue responsible for both control tuples and data tuples
private final Communicator<HeronTuples.HeronTupleSet> outQueue;
// Maximum data tuple size in bytes we can put in one HeronTupleSet
private final ByteAmount maxDataTupleSize;
private final int dataTupleSetCapacity;
private final int controlTupleSetCapacity;
private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
// Total data emitted in bytes for the entire life
private long totalDataEmittedInBytes;
// Current size in bytes for data types to pack into the HeronTupleSet
private long currentDataTupleSizeInBytes;
public OutgoingTupleCollection(
String componentName,
Communicator<HeronTuples.HeronTupleSet> outQueue) {
this.outQueue = outQueue;
this.componentName = componentName;
SystemConfig systemConfig =
(SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
// Initialize the values in constructor
this.totalDataEmittedInBytes = 0;
this.currentDataTupleSizeInBytes = 0;
// Read the config values
this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize();
this.controlTupleSetCapacity = systemConfig.getInstanceSetControlTupleCapacity();
}
public void sendOutTuples() {
flushRemaining();
}
public void addDataTuple(
String streamId,
HeronTuples.HeronDataTuple.Builder newTuple,
long tupleSizeInBytes) {
if (currentDataTuple == null
|| !currentDataTuple.getStream().getId().equals(streamId)
|| currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
|| currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
initNewDataTuple(streamId);
}
currentDataTuple.addTuples(newTuple);
if (tupleSizeInBytes > 8 * 1024 * 1024) {
LOG.info("Large tuple size!!!!: " + tupleSizeInBytes);
}
currentDataTupleSizeInBytes += tupleSizeInBytes;
totalDataEmittedInBytes += tupleSizeInBytes;
}
public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
if (currentControlTuple == null
|| currentControlTuple.getFailsCount() > 0
|| currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
initNewControlTuple();
}
currentControlTuple.addAcks(newTuple);
// Add the size of data in bytes ready to send out
totalDataEmittedInBytes += tupleSizeInBytes;
}
public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
if (currentControlTuple == null
|| currentControlTuple.getAcksCount() > 0
|| currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
initNewControlTuple();
}
currentControlTuple.addFails(newTuple);
// Add the size of data in bytes ready to send out
totalDataEmittedInBytes += tupleSizeInBytes;
}
private void initNewDataTuple(String streamId) {
flushRemaining();
// Reset the set for data tuple
currentDataTupleSizeInBytes = 0;
TopologyAPI.StreamId.Builder sbldr = TopologyAPI.StreamId.newBuilder();
sbldr.setId(streamId);
sbldr.setComponentName(componentName);
currentDataTuple = HeronTuples.HeronDataTupleSet.newBuilder();
currentDataTuple.setStream(sbldr);
}
private void initNewControlTuple() {
flushRemaining();
currentControlTuple = HeronTuples.HeronControlTupleSet.newBuilder();
}
private void flushRemaining() {
if (currentDataTuple != null) {
HeronTuples.HeronTupleSet.Builder bldr = HeronTuples.HeronTupleSet.newBuilder();
bldr.setData(currentDataTuple);
pushTupleToQueue(bldr, outQueue);
currentDataTuple = null;
}
if (currentControlTuple != null) {
HeronTuples.HeronTupleSet.Builder bldr = HeronTuples.HeronTupleSet.newBuilder();
bldr.setControl(currentControlTuple);
pushTupleToQueue(bldr, outQueue);
currentControlTuple = null;
}
}
private void pushTupleToQueue(HeronTuples.HeronTupleSet.Builder bldr,
Communicator<HeronTuples.HeronTupleSet> out) {
// The Communicator has un-bounded capacity so the offer will always be successful
out.offer(bldr.build());
}
// Return true we could offer item to outQueue
public boolean isOutQueuesAvailable() {
return outQueue.size() < outQueue.getExpectedAvailableCapacity();
}
public long getTotalDataEmittedInBytes() {
return totalDataEmittedInBytes;
}
// Clean the internal state of OutgoingTupleCollection
public void clear() {
currentControlTuple = null;
currentDataTuple = null;
outQueue.clear();
}
}