blob: d2202c4c727133aab16150419d638a6fafac4f5f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.topology;
import static org.apache.storm.spout.CheckPointState.Action;
import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
import static org.apache.storm.spout.CheckPointState.Action.PREPARE;
import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.state.State;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wraps a {@link IStatefulBolt} and manages the state of the bolt.
*/
public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
private static final Logger LOG = LoggerFactory.getLogger(StatefulBoltExecutor.class);
private final IStatefulBolt<T> bolt;
private State state;
private boolean boltInitialized = false;
private List<Tuple> pendingTuples = new ArrayList<>();
private List<Tuple> preparedTuples = new ArrayList<>();
private AckTrackingOutputCollector collector;
public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
this.bolt = bolt;
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
// get the last successfully committed state from state store
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId();
prepare(topoConf, context, collector, StateFactory.getState(namespace, topoConf, context));
}
// package access for unit tests
void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector, State state) {
init(context, collector);
this.collector = new AckTrackingOutputCollector(collector);
bolt.prepare(topoConf, context, this.collector);
this.state = state;
}
@Override
public void cleanup() {
bolt.cleanup();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
bolt.declareOutputFields(declarer);
declareCheckpointStream(declarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return bolt.getComponentConfiguration();
}
@Override
protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
if (action == PREPARE) {
if (boltInitialized) {
bolt.prePrepare(txid);
state.prepareCommit(txid);
preparedTuples.addAll(collector.ackedTuples());
} else {
/*
* May be the task restarted in the middle and the state needs be initialized.
* Fail fast and trigger recovery.
*/
LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized.");
collector.fail(checkpointTuple);
return;
}
} else if (action == COMMIT) {
bolt.preCommit(txid);
state.commit(txid);
ack(preparedTuples);
} else if (action == ROLLBACK) {
bolt.preRollback();
state.rollback();
fail(preparedTuples);
fail(collector.ackedTuples());
} else if (action == INITSTATE) {
if (!boltInitialized) {
bolt.initState((T) state);
boltInitialized = true;
LOG.debug("{} pending tuples to process", pendingTuples.size());
for (Tuple tuple : pendingTuples) {
doExecute(tuple);
}
pendingTuples.clear();
} else {
/*
* If a worker crashes, the states of all workers are rolled back and an initState message is sent across
* the topology so that crashed workers can initialize their state.
* The bolts that have their state already initialized need not be re-initialized.
*/
LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}",
checkpointTuple, action, txid);
}
}
collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
collector.delegate.ack(checkpointTuple);
}
@Override
protected void handleTuple(Tuple input) {
if (boltInitialized) {
doExecute(input);
} else {
LOG.debug("Bolt state not initialized, adding tuple {} to pending tuples", input);
pendingTuples.add(input);
}
}
private void doExecute(Tuple tuple) {
bolt.execute(tuple);
}
private void ack(List<Tuple> tuples) {
if (!tuples.isEmpty()) {
LOG.debug("Acking {} tuples", tuples.size());
for (Tuple tuple : tuples) {
collector.delegate.ack(tuple);
}
tuples.clear();
}
}
private void fail(List<Tuple> tuples) {
if (!tuples.isEmpty()) {
LOG.debug("Failing {} tuples", tuples.size());
for (Tuple tuple : tuples) {
collector.fail(tuple);
}
tuples.clear();
}
}
private static class AckTrackingOutputCollector extends AnchoringOutputCollector {
private final OutputCollector delegate;
private final Queue<Tuple> ackedTuples;
AckTrackingOutputCollector(OutputCollector delegate) {
super(delegate);
this.delegate = delegate;
this.ackedTuples = new ConcurrentLinkedQueue<>();
}
List<Tuple> ackedTuples() {
List<Tuple> result = new ArrayList<>();
Iterator<Tuple> it = ackedTuples.iterator();
while (it.hasNext()) {
result.add(it.next());
it.remove();
}
return result;
}
@Override
public void ack(Tuple input) {
ackedTuples.add(input);
}
}
}