blob: 3768cb1390d2284699e81f3f40b54d0f206525c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.transactional;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.transactional.state.RotatingTransactionalState;
import backtype.storm.transactional.state.TransactionalState;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.math.BigInteger;
import java.util.Map;
import java.util.TreeMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionalSpoutCoordinator extends BaseRichSpout {
public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class);
public static final BigInteger INIT_TXID = BigInteger.ONE;
public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch";
public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/commit";
private static final String CURRENT_TX = "currtx";
private static final String META_DIR = "meta";
private ITransactionalSpout _spout;
private ITransactionalSpout.Coordinator _coordinator;
private TransactionalState _state;
private RotatingTransactionalState _coordinatorState;
TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
private SpoutOutputCollector _collector;
private Random _rand;
BigInteger _currTransaction;
int _maxTransactionActive;
StateInitializer _initializer;
public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
_spout = spout;
}
public ITransactionalSpout getSpout() {
return _spout;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_rand = new Random(Utils.secureRandomLong());
_state = TransactionalState.newCoordinatorState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), _spout.getComponentConfiguration());
_coordinatorState = new RotatingTransactionalState(_state, META_DIR, true);
_collector = collector;
_coordinator = _spout.getCoordinator(conf, context);
_currTransaction = getStoredCurrTransaction(_state);
Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if (active == null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = Utils.getInt(active);
}
_initializer = new StateInitializer();
}
@Override
public void close() {
_state.close();
}
@Override
public void nextTuple() {
sync();
}
@Override
public void ack(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
if (status != null && tx.equals(status.attempt)) {
if (status.status == AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
} else if (status.status == AttemptStatus.COMMITTING) {
_activeTx.remove(tx.getTransactionId());
_coordinatorState.cleanupBefore(tx.getTransactionId());
_currTransaction = nextTransactionId(tx.getTransactionId());
_state.setData(CURRENT_TX, _currTransaction);
}
sync();
}
}
@Override
public void fail(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
if (stored != null && tx.equals(stored.attempt)) {
_activeTx.tailMap(tx.getTransactionId()).clear();
sync();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
// when it sees the earlier txid it should know to emit nothing
declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid"));
declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
try {
if (_activeTx.size() < _maxTransactionActive) {
BigInteger curr = _currTransaction;
for (int i = 0; i < _maxTransactionActive; i++) {
if ((_coordinatorState.hasCache(curr) || _coordinator.isReady()) && !_activeTx.containsKey(curr)) {
TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
Object state = _coordinatorState.getState(curr, _initializer);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
}
curr = nextTransactionId(curr);
}
}
} catch (FailedException e) {
LOG.warn("Failed to get metadata for a transaction", e);
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
}
private static enum AttemptStatus {
PROCESSING, PROCESSED, COMMITTING
}
private static class TransactionStatus {
TransactionAttempt attempt;
AttemptStatus status;
public TransactionStatus(TransactionAttempt attempt) {
this.attempt = attempt;
this.status = AttemptStatus.PROCESSING;
}
@Override
public String toString() {
return attempt.toString() + " <" + status.toString() + ">";
}
}
private BigInteger nextTransactionId(BigInteger id) {
return id.add(BigInteger.ONE);
}
private BigInteger previousTransactionId(BigInteger id) {
if (id.equals(INIT_TXID)) {
return null;
} else {
return id.subtract(BigInteger.ONE);
}
}
private BigInteger getStoredCurrTransaction(TransactionalState state) {
BigInteger ret = (BigInteger) state.getData(CURRENT_TX);
if (ret == null)
return INIT_TXID;
else
return ret;
}
private class StateInitializer implements RotatingTransactionalState.StateInitializer {
@Override
public Object init(BigInteger txid, Object lastState) {
return _coordinator.initializeTransaction(txid, lastState);
}
}
}