blob: de2d6adbbd883c0eefe812a70c6e71b7a8ac687e [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.integration_test.core;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.twitter.heron.api.spout.IRichSpout;
import com.twitter.heron.api.spout.ISpoutOutputCollector;
import com.twitter.heron.api.spout.SpoutOutputCollector;
import com.twitter.heron.api.topology.TopologyContext;
/**
* Class that emits tuples until a given condition is met. The rate of emission is throttled and
* once the condition is met there is an additional fixed time period where new tuples are emitted.
* Upon completion uploads the set of tuples emitted and acked to the state server.
*/
class EmitUntilConditionTestSpout extends IntegrationTestSpout {
private static final long serialVersionUID = 5231279157676404046L;
private static final Logger LOG = Logger.getLogger(EmitUntilConditionTestSpout.class.getName());
private static final ObjectMapper MAPPER = new ObjectMapper();
private final long postEmitSleepSeconds;
private final long postConditionEmitSeconds;
private final Condition untilCondition;
private final String tuplesEmittedStateServerUrl;
private int taskIndex;
private List<String> tuplesEmitted;
private boolean conditionCheckerRunning = false;
private RuntimeException conditionCheckerException = null;
private Long quittingTime = null;
EmitUntilConditionTestSpout(IRichSpout delegateSpout,
Condition untilCondition,
String topologyStartedStateUrl,
String tuplesEmittedStateServerUrl) {
super(delegateSpout, Integer.MAX_VALUE, topologyStartedStateUrl);
this.untilCondition = untilCondition;
this.tuplesEmittedStateServerUrl = tuplesEmittedStateServerUrl;
this.postEmitSleepSeconds = 1;
this.postConditionEmitSeconds = 10;
this.tuplesEmitted = new ArrayList<>();
}
private void startConditionChecker() {
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
untilCondition.satisfyCondition();
quittingTime = System.currentTimeMillis() + postConditionEmitSeconds * 1000;
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (RuntimeException e) {
conditionCheckerException = e;
}
}
});
}
@Override
public void open(Map<String, Object> map,
TopologyContext topologyContext,
SpoutOutputCollector outputCollector) {
super.open(map, topologyContext, new EmitReportingTestSpoutCollector(outputCollector));
taskIndex = topologyContext.getThisTaskIndex();
}
@Override
protected boolean doneEmitting() {
return quittingTime != null && System.currentTimeMillis() >= quittingTime;
}
@Override
public void nextTuple() {
if (!conditionCheckerRunning) {
startConditionChecker();
conditionCheckerRunning = true;
} else if (conditionCheckerException != null) {
throw conditionCheckerException;
}
super.nextTuple();
}
@Override
protected TimeUnit getPostEmitSleepTimeUnit() {
return TimeUnit.SECONDS;
}
@Override
protected long getPostEmitSleepTime() {
return this.postEmitSleepSeconds;
}
@Override
protected void handleAckedMessage(Object messageId, List<Object> tuple) {
super.handleAckedMessage(messageId, tuple);
try {
tuplesEmitted.add(MAPPER.writeValueAsString(tuple.get(0)));
} catch (JsonProcessingException e) {
LOG.log(Level.SEVERE,
"Could not convert map to JSONString: " + tuple.get(0).toString(), e);
}
}
private final class EmitReportingTestSpoutCollector extends SpoutOutputCollector {
private EmitReportingTestSpoutCollector(ISpoutOutputCollector delegate) {
super(delegate);
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
List<Integer> result = super.emit(streamId, tuple, messageId);
handleTuple(streamId, tuple);
return result;
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
super.emitDirect(taskId, streamId, tuple, messageId);
handleTuple(streamId, tuple);
}
private void handleTuple(String streamId, List<Object> tuple) {
if (Constants.INTEGRATION_TEST_CONTROL_STREAM_ID.equals(streamId)
&& tuple == TERMINAL_TUPLE) {
// All tuples have been handled, push tuplesEmitted to state server
sendTuplesEmittedToStateServer(tuplesEmitted);
}
}
private void sendTuplesEmittedToStateServer(List<String> tuples) {
String url = tuplesEmittedStateServerUrl + "_" + taskIndex;
try {
String tuplesSent = tuples.toString();
LOG.info(String.format("Posting tuples emitted to %s: %s", url, tuplesSent));
HttpUtils.httpJsonPost(url, tuplesSent);
} catch (IOException | ParseException e) {
throw new RuntimeException("Failure posting tuples emitted to " + url, e);
}
}
}
}