blob: 78cdd8d983ee5e9fb7ef4c831cedebd301942699 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.trident.topology;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.WindowedTimeThrottler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.state.TransactionalState;
public class MasterBatchCoordinator extends BaseRichSpout {
public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class);
public static final long INIT_TXID = 1L;
public static final String BATCH_STREAM_ID = "$batch";
public static final String COMMIT_STREAM_ID = "$commit";
public static final String SUCCESS_STREAM_ID = "$success";
private static final String CURRENT_TX = "currtx";
private static final String CURRENT_ATTEMPTS = "currattempts";
private List<TransactionalState> _states = new ArrayList();
TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
TreeMap<Long, Integer> _attemptIds;
private SpoutOutputCollector _collector;
Long _currTransaction;
int _maxTransactionActive;
List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
List<String> _managedSpoutIds;
List<ITridentSpout> _spouts;
WindowedTimeThrottler _throttler;
boolean _active = true;
public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
if(spoutIds.isEmpty()) {
throw new IllegalArgumentException("Must manage at least one spout");
}
_managedSpoutIds = spoutIds;
_spouts = spouts;
}
public List<String> getManagedSpoutIds(){
return _managedSpoutIds;
}
@Override
public void activate() {
_active = true;
}
@Override
public void deactivate() {
_active = false;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
for(String spoutId: _managedSpoutIds) {
_states.add(TransactionalState.newCoordinatorState(conf, spoutId));
}
_currTransaction = getStoredCurrTransaction();
_collector = collector;
Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if(active==null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = active.intValue();
}
_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
for(int i=0; i<_spouts.size(); i++) {
String txId = _managedSpoutIds.get(i);
_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
}
}
@Override
public void close() {
for(TransactionalState state: _states) {
state.close();
}
}
@Override
public void nextTuple() {
sync();
}
@Override
public void ack(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
if(status!=null && tx.equals(status.attempt)) {
if(status.status==AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
} else if(status.status==AttemptStatus.COMMITTING) {
_activeTx.remove(tx.getTransactionId());
_attemptIds.remove(tx.getTransactionId());
_collector.emit(SUCCESS_STREAM_ID, new Values(tx));
_currTransaction = nextTransactionId(tx.getTransactionId());
for(TransactionalState state: _states) {
state.setData(CURRENT_TX, _currTransaction);
}
}
sync();
}
}
@Override
public void fail(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
if(stored!=null && tx.equals(stored.attempt)) {
_activeTx.tailMap(tx.getTransactionId()).clear();
sync();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
// when it sees the earlier txid it should know to emit nothing
declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
if(_active) {
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}
private boolean isReady(long txid) {
if(_throttler.isThrottled()) return false;
//TODO: make this strategy configurable?... right now it goes if anyone is ready
for(ITridentSpout.BatchCoordinator coord: _coordinators) {
if(coord.isReady(txid)) return true;
}
return false;
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
ret.registerSerialization(TransactionAttempt.class);
return ret;
}
private static enum AttemptStatus {
PROCESSING,
PROCESSED,
COMMITTING
}
private static class TransactionStatus {
TransactionAttempt attempt;
AttemptStatus status;
public TransactionStatus(TransactionAttempt attempt) {
this.attempt = attempt;
this.status = AttemptStatus.PROCESSING;
}
@Override
public String toString() {
return attempt.toString() + " <" + status.toString() + ">";
}
}
private Long nextTransactionId(Long id) {
return id + 1;
}
private Long getStoredCurrTransaction() {
Long ret = INIT_TXID;
for(TransactionalState state: _states) {
Long curr = (Long) state.getData(CURRENT_TX);
if(curr!=null && curr.compareTo(ret) > 0) {
ret = curr;
}
}
return ret;
}
private TreeMap<Long, Integer> getStoredCurrAttempts(long currTransaction, int maxBatches) {
TreeMap<Long, Integer> ret = new TreeMap<Long, Integer>();
for(TransactionalState state: _states) {
Map<Object, Number> attempts = (Map) state.getData(CURRENT_ATTEMPTS);
if(attempts==null) attempts = new HashMap();
for(Entry<Object, Number> e: attempts.entrySet()) {
// this is because json doesn't allow numbers as keys...
// TODO: replace json with a better form of encoding
Number txidObj;
if(e.getKey() instanceof String) {
txidObj = Long.parseLong((String) e.getKey());
} else {
txidObj = (Number) e.getKey();
}
long txid = ((Number) txidObj).longValue();
int attemptId = ((Number) e.getValue()).intValue();
Integer curr = ret.get(txid);
if(curr==null || attemptId > curr) {
ret.put(txid, attemptId);
}
}
}
ret.headMap(currTransaction).clear();
ret.tailMap(currTransaction + maxBatches - 1).clear();
return ret;
}
}