blob: ef5b52bd6053a695949ad05c88e50dbca4470086 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.resource;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import com.twitter.heron.api.bolt.BaseRichBolt;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.topology.OutputFieldsDeclarer;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.common.basics.SingletonRegistry;
/**
* A Bolt used for unit test, it will execute and modify some singletons' value:
* 1. It will tuple.getString(0) and append to the singleton "received-string-list"
* 2. It will increment singleton "execute-count" when it executes once
* 3. It will ack the tuple and increment singleton Constants.ACK_COUNT when # of tuples executed is odd
* 4. It will fail the tuple and increment singleton Constants.FAIL_COUNT when # of tuples executed is even
*/
@Ignore
public class TestBolt extends BaseRichBolt {
private static final long serialVersionUID = -5160420613503624743L;
private OutputCollector outputCollector;
private int tupleExecuted = 0;
@Override
public void prepare(
Map<String, Object> map,
TopologyContext topologyContext,
OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void execute(Tuple tuple) {
AtomicInteger ackCount =
(AtomicInteger) SingletonRegistry.INSTANCE.getSingleton(Constants.ACK_COUNT);
AtomicInteger failCount =
(AtomicInteger) SingletonRegistry.INSTANCE.getSingleton(Constants.FAIL_COUNT);
AtomicInteger tupleExecutedCount =
(AtomicInteger) SingletonRegistry.INSTANCE.getSingleton("execute-count");
StringBuilder receivedStrings =
(StringBuilder) SingletonRegistry.INSTANCE.getSingleton("received-string-list");
if (receivedStrings != null) {
receivedStrings.append(tuple.getString(0));
}
if (tupleExecutedCount != null) {
tupleExecutedCount.getAndIncrement();
}
if ((tupleExecuted & 1) == 0) {
outputCollector.ack(tuple);
if (ackCount != null) {
ackCount.getAndIncrement();
}
} else {
outputCollector.fail(tuple);
if (failCount != null) {
failCount.getAndIncrement();
}
}
tupleExecuted++;
outputCollector.emit(new Values(tuple.getString(0)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}