blob: edb882b509a705058cd121ab5d7b9537ddbd6db1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.batch.impl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.util.BatchCommon;
import com.alibaba.jstorm.batch.util.BatchDef;
import com.alibaba.jstorm.batch.util.BatchStatus;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
/**
* Strong Sequence
*
* @author zhongyan.feng
* @version
*/
public class BatchSpoutTrigger implements IRichSpout {
/** */
private static final long serialVersionUID = 7215109169247425954L;
private static final Logger LOG = LoggerFactory.getLogger(BatchSpoutTrigger.class);
private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue;
private transient ClusterState zkClient;
private transient SpoutOutputCollector collector;
private static final String ZK_NODE_PATH = "/trigger";
private static BatchId currentBatchId = null;
private Map conf;
private String taskName;
private IntervalCheck intervalCheck;
/**
* @throws Exception
*
*/
public void initMsgId() throws Exception {
Long zkMsgId = null;
byte[] data = zkClient.get_data(ZK_NODE_PATH, false);
if (data != null) {
String value = new String(data);
try {
zkMsgId = Long.valueOf(value);
LOG.info("ZK msgId:" + zkMsgId);
} catch (Exception e) {
LOG.warn("Failed to get msgId ", e);
}
}
if (zkMsgId != null) {
BatchId.updateId(zkMsgId);
}
int max_spout_pending = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1);
for (int i = 0; i < max_spout_pending; i++) {
BatchSpoutMsgId msgId = BatchSpoutMsgId.mkInstance();
if (currentBatchId == null) {
currentBatchId = msgId.getBatchId();
}
batchQueue.offer(msgId);
LOG.info("Push into queue," + msgId);
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
batchQueue = new LinkedBlockingQueue<BatchSpoutMsgId>();
this.collector = collector;
this.conf = conf;
taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
intervalCheck = new IntervalCheck();
try {
zkClient = BatchCommon.getZkClient(conf);
initMsgId();
} catch (Exception e) {
LOG.error("", e);
throw new RuntimeException("Failed to init");
}
LOG.info("Successfully open " + taskName);
}
@Override
public void close() {
zkClient.close();
}
@Override
public void activate() {
LOG.info("Activate " + taskName);
}
@Override
public void deactivate() {
LOG.info("Deactivate " + taskName);
}
protected String getStreamId(BatchStatus batchStatus) {
if (batchStatus == BatchStatus.COMPUTING) {
return BatchDef.COMPUTING_STREAM_ID;
} else if (batchStatus == BatchStatus.PREPARE_COMMIT) {
return BatchDef.PREPARE_STREAM_ID;
} else if (batchStatus == BatchStatus.COMMIT) {
return BatchDef.COMMIT_STREAM_ID;
} else if (batchStatus == BatchStatus.POST_COMMIT) {
return BatchDef.POST_STREAM_ID;
} else if (batchStatus == BatchStatus.REVERT_COMMIT) {
return BatchDef.REVERT_STREAM_ID;
} else {
LOG.error("Occur unkonw type BatchStatus " + batchStatus);
throw new RuntimeException();
}
}
protected boolean isCommitStatus(BatchStatus batchStatus) {
if (batchStatus == BatchStatus.COMMIT) {
return true;
} else if (batchStatus == BatchStatus.REVERT_COMMIT) {
return true;
} else {
return false;
}
}
protected boolean isCommitWait(BatchSpoutMsgId msgId) {
if (isCommitStatus(msgId.getBatchStatus()) == false) {
return false;
}
// left status is commit status
if (currentBatchId.getId() >= msgId.getBatchId().getId()) {
return false;
}
return true;
}
@Override
public void nextTuple() {
BatchSpoutMsgId msgId = null;
try {
msgId = batchQueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.error("", e);
}
if (msgId == null) {
return;
}
if (isCommitWait(msgId)) {
batchQueue.offer(msgId);
if (intervalCheck.check()) {
LOG.info("Current msgId " + msgId + ", but current commit BatchId is " + currentBatchId);
} else {
LOG.debug("Current msgId " + msgId + ", but current commit BatchId is " + currentBatchId);
}
return;
}
String streamId = getStreamId(msgId.getBatchStatus());
List<Integer> outTasks = collector.emit(streamId, new Values(msgId.getBatchId()), msgId);
if (outTasks.isEmpty()) {
forward(msgId);
}
return;
}
protected void mkMsgId(BatchSpoutMsgId oldMsgId) {
synchronized (BatchSpoutMsgId.class) {
if (currentBatchId.getId() <= oldMsgId.getBatchId().getId()) {
// this is normal case
byte[] data = String.valueOf(currentBatchId.getId()).getBytes();
try {
zkClient.set_data(ZK_NODE_PATH, data);
} catch (Exception e) {
LOG.error("Failed to update to ZK " + oldMsgId, e);
}
currentBatchId = BatchId.incBatchId(oldMsgId.getBatchId());
} else {
// bigger batchId has been failed, when old msgId finish
// it will go here
}
}
BatchSpoutMsgId newMsgId = BatchSpoutMsgId.mkInstance();
batchQueue.offer(newMsgId);
StringBuilder sb = new StringBuilder();
sb.append("Create new BatchId,");
sb.append("old:").append(oldMsgId);
sb.append("new:").append(newMsgId);
sb.append("currentBatchId:").append(currentBatchId);
LOG.info(sb.toString());
}
protected void forward(BatchSpoutMsgId msgId) {
BatchStatus status = msgId.getBatchStatus();
BatchStatus newStatus = status.forward();
if (newStatus == null) {
// create new status
mkMsgId(msgId);
LOG.info("Finish old batch " + msgId);
} else {
msgId.setBatchStatus(newStatus);
batchQueue.offer(msgId);
LOG.info("Forward batch " + msgId);
}
}
@Override
public void ack(Object msgId) {
if (msgId instanceof BatchSpoutMsgId) {
forward((BatchSpoutMsgId) msgId);
return;
} else {
LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" + msgId);
return;
}
}
protected void handleFail(BatchSpoutMsgId msgId) {
LOG.info("Failed batch " + msgId);
BatchStatus status = msgId.getBatchStatus();
BatchStatus newStatus = status.error();
if (newStatus == BatchStatus.ERROR) {
// create new status
mkMsgId(msgId);
} else {
msgId.setBatchStatus(newStatus);
batchQueue.offer(msgId);
}
}
@Override
public void fail(Object msgId) {
if (msgId instanceof BatchSpoutMsgId) {
handleFail((BatchSpoutMsgId) msgId);
} else {
LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" + msgId);
return;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields("BatchId"));
declarer.declareStream(BatchDef.PREPARE_STREAM_ID, new Fields("BatchId"));
declarer.declareStream(BatchDef.COMMIT_STREAM_ID, new Fields("BatchId"));
declarer.declareStream(BatchDef.REVERT_STREAM_ID, new Fields("BatchId"));
declarer.declareStream(BatchDef.POST_STREAM_ID, new Fields("BatchId"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> map = new HashMap<String, Object>();
ConfigExtension.setSpoutSingleThread(map, true);
return map;
}
}