blob: dfaa0340b38cebd527cb6edaaedfc37195de1aea [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.integration_test.core;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import com.twitter.heron.api.bolt.IOutputCollector;
import com.twitter.heron.api.bolt.IRichBolt;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.api.topology.OutputFieldsDeclarer;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
public class IntegrationTestBolt implements IRichBolt, IUpdatable {
private static final long serialVersionUID = 6304554167838679097L;
private static final Logger LOG = Logger.getLogger(IntegrationTestBolt.class.getName());
private final IRichBolt delegateBolt;
private int terminalsToReceive = 0;
private long tuplesReceived = 0;
private long tuplesProcessed = 0;
// For ack/fail
private Tuple currentTupleProcessing = null;
private OutputCollector collector;
public IntegrationTestBolt(IRichBolt delegate) {
this.delegateBolt = delegate;
}
@Override
public void update(TopologyContext topologyContext) {
LOG.info("update called with TopologyContext: " + topologyContext);
// if we get a new topology context we reset the terminalsToReceive regardless of if we've
// already received any. The expectation is that after a change in physical plan, upstream
// spouts will re-emit and send new terminals.
this.terminalsToReceive = calculateTerminalsToReceive(topologyContext);
}
@Override
public void prepare(Map<String, Object> map,
TopologyContext context,
OutputCollector outputCollector) {
update(context);
this.collector = new OutputCollector(new IntegrationTestBoltCollector(outputCollector));
this.delegateBolt.prepare(map, context, collector);
}
private int calculateTerminalsToReceive(TopologyContext context) {
int total = 0;
// Set the # of terminal Signal to receive, = the # number all instance of upstream components
HashSet<String> upstreamComponents = new HashSet<String>();
for (TopologyAPI.StreamId streamId : context.getThisSources().keySet()) {
upstreamComponents.add(streamId.getComponentName());
}
for (String name : upstreamComponents) {
total += context.getComponentTasks(name).size();
}
LOG.info("TerminalsToReceive: " + total);
return total;
}
@Override
public void execute(Tuple tuple) {
String streamID = tuple.getSourceStreamId();
LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);
if (streamID.equals(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID)) {
terminalsToReceive--;
if (terminalsToReceive == 0) {
// invoke the finishBatch() callback if necessary
if (IBatchBolt.class.isInstance(delegateBolt)) {
LOG.fine("Invoke bolt to do finishBatch!");
((IBatchBolt) delegateBolt).finishBatch();
}
LOG.info("Received the last terminal, populating the terminals to downstream");
collector.emit(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID,
tuple,
new Values(Constants.INTEGRATION_TEST_TERMINAL));
} else {
LOG.info(String.format(
"Received a terminal, need to receive %s more", terminalsToReceive));
}
} else {
tuplesReceived++;
currentTupleProcessing = tuple;
delegateBolt.execute(tuple);
// We ack only the tuples in user's logic
collector.ack(tuple);
}
}
@Override
public void cleanup() {
delegateBolt.cleanup();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID,
new Fields(Constants.INTEGRATION_TEST_TERMINAL));
delegateBolt.declareOutputFields(outputFieldsDeclarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return delegateBolt.getComponentConfiguration();
}
private class IntegrationTestBoltCollector implements IOutputCollector {
private final IOutputCollector delegate;
IntegrationTestBoltCollector(IOutputCollector delegate) {
this.delegate = delegate;
}
@Override
public List<Integer> emit(String s, Collection<Tuple> tuples, List<Object> objects) {
if (tuples == null) {
return delegate.emit(s, Arrays.asList(currentTupleProcessing), objects);
}
return delegate.emit(s, tuples, objects);
}
@Override
public void emitDirect(int i, String s, Collection<Tuple> tuples, List<Object> objects) {
if (tuples == null) {
delegate.emitDirect(i, s, Arrays.asList(currentTupleProcessing), objects);
}
delegate.emitDirect(i, s, tuples, objects);
}
@Override
public void ack(Tuple tuple) {
LOG.fine("Try to do a ack. tuplesProcessed: "
+ tuplesProcessed + " ; tuplesReceived: " + tuplesReceived);
if (tuplesProcessed < tuplesReceived) {
delegate.ack(tuple);
tuplesProcessed++;
}
}
@Override
public void fail(Tuple tuple) {
LOG.fine("Try to do a fail. tuplesProcessed: "
+ tuplesProcessed + " ; tuplesReceived: " + tuplesReceived);
if (tuplesProcessed < tuplesReceived) {
delegate.fail(tuple);
tuplesProcessed++;
}
}
@Override
public void reportError(Throwable throwable) {
delegate.reportError(throwable);
}
}
}