blob: 1060d187e5c0f83bf796b5b5b120d7694fd59569 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.topology;
import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class that abstracts the common logic for executing bolts in a stateful topology.
*/
public abstract class BaseStatefulBoltExecutor implements IRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(BaseStatefulBoltExecutor.class);
private final Map<TransactionRequest, Integer> transactionRequestCount;
protected OutputCollector collector;
private int checkPointInputTaskCount;
private long lastTxid = Long.MIN_VALUE;
public BaseStatefulBoltExecutor() {
transactionRequestCount = new HashMap<>();
}
protected void init(TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
}
/**
* returns the total number of input checkpoint streams across all input tasks to this component.
*/
private int getCheckpointInputTaskCount(TopologyContext context) {
int count = 0;
for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
count += context.getComponentTasks(inputStream.get_componentId()).size();
}
}
return count;
}
@Override
public void execute(Tuple input) {
if (CheckpointSpout.isCheckpoint(input)) {
processCheckpoint(input);
} else {
handleTuple(input);
}
}
/**
* Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component.
*/
private void processCheckpoint(Tuple input) {
CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
if (shouldProcessTransaction(action, txid)) {
LOG.debug("Processing action {}, txid {}", action, txid);
try {
if (txid >= lastTxid) {
handleCheckpoint(input, action, txid);
if (action == ROLLBACK) {
lastTxid = txid - 1;
} else {
lastTxid = txid;
}
} else {
LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
collector.ack(input);
}
} catch (Throwable th) {
LOG.error("Got error while processing checkpoint tuple", th);
collector.fail(input);
collector.reportError(th);
}
} else {
LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, "
+ "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
collector.ack(input);
}
}
/**
* Checks if check points have been received from all tasks across all input streams to this component.
*/
private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
TransactionRequest request = new TransactionRequest(action, txid);
Integer count;
if ((count = transactionRequestCount.get(request)) == null) {
transactionRequestCount.put(request, 1);
count = 1;
} else {
transactionRequestCount.put(request, ++count);
}
if (count == checkPointInputTaskCount) {
transactionRequestCount.remove(request);
return true;
}
return false;
}
protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
}
/**
* Sub-classes can implement the logic for handling the tuple.
*
* @param input the input tuple
*/
protected abstract void handleTuple(Tuple input);
/**
* Sub-classes can implement the logic for handling checkpoint tuple.
*
* @param checkpointTuple the checkpoint tuple
* @param action the action (prepare, commit, rollback or initstate)
* @param txid the transaction id.
*/
protected abstract void handleCheckpoint(Tuple checkpointTuple, CheckPointState.Action action, long txid);
protected static class AnchoringOutputCollector extends OutputCollector {
AnchoringOutputCollector(IOutputCollector delegate) {
super(delegate);
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
}
}
private static class TransactionRequest {
private final CheckPointState.Action action;
private final long txid;
TransactionRequest(CheckPointState.Action action, long txid) {
this.action = action;
this.txid = txid;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TransactionRequest that = (TransactionRequest) o;
if (txid != that.txid) {
return false;
}
return !(action != null ? !action.equals(that.action) : that.action != null);
}
@Override
public int hashCode() {
int result = action != null ? action.hashCode() : 0;
result = 31 * result + (int) (txid ^ (txid >>> 32));
return result;
}
@Override
public String toString() {
return "TransactionRequest{"
+ "action='" + action + '\''
+ ", txid=" + txid
+ '}';
}
}
}