blob: 4719ce618bc4a88a1510f80da7a1c51783c08d08 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.testing;
import static org.apache.storm.Testing.whileTimeout;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.ILocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.Thrift;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A tracked topology keeps metrics for every bolt and spout.
* This allows a test to know how many tuples have been fully processed.
* Metrics are tracked on a per cluster basis. So only one tracked topology
* should be run in a tracked cluster to avoid conflicts.
*/
public class TrackedTopology {
private static final Logger LOG = LoggerFactory.getLogger(TrackedTopology.class);
private final StormTopology topology;
private final AtomicInteger lastSpoutCommit;
private final ILocalCluster cluster;
/**
* Create a new topology to be tracked.
* @param origTopo the original topology.
* @param cluster a cluster that should have been launched with tracking enabled.
*/
public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) {
LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId());
this.cluster = cluster;
lastSpoutCommit = new AtomicInteger(0);
String id = cluster.getTrackedId();
topology = origTopo.deepCopy();
for (Bolt bolt : topology.get_bolts().values()) {
IRichBolt obj = (IRichBolt) Thrift.deserializeComponentObject(bolt.get_bolt_object());
bolt.set_bolt_object(Thrift.serializeComponentObject(new BoltTracker(obj, id)));
}
for (SpoutSpec spout : topology.get_spouts().values()) {
IRichSpout obj = (IRichSpout) Thrift.deserializeComponentObject(spout.get_spout_object());
spout.set_spout_object(Thrift.serializeComponentObject(new SpoutTracker(obj, id)));
}
}
@SuppressWarnings("unchecked")
private static int globalAmt(String id, String key) {
LOG.warn("Reading tracked metrics for ID {}", id);
return ((ConcurrentHashMap<String, AtomicInteger>) RegisteredGlobalState.getState(id)).get(key).get();
}
public StormTopology getTopology() {
return topology;
}
public ILocalCluster getCluster() {
return cluster;
}
/**
* Wait for 1 tuple to be fully processed.
*/
public void trackedWait() {
trackedWait(1, Testing.TEST_TIMEOUT_MS);
}
/**
* Wait for amt tuples to be fully processed.
*/
public void trackedWait(int amt) {
trackedWait(amt, Testing.TEST_TIMEOUT_MS);
}
/**
* Wait for amt tuples to be fully processed timeoutMs happens.
*/
public void trackedWait(int amt, int timeoutMs) {
final int target = amt + lastSpoutCommit.get();
final String id = cluster.getTrackedId();
Random rand = ThreadLocalRandom.current();
whileTimeout(timeoutMs,
() -> {
int se = globalAmt(id, "spout-emitted");
int transferred = globalAmt(id, "transferred");
int processed = globalAmt(id, "processed");
LOG.info("emitted {} target {} transferred {} processed {}", se, target, transferred, processed);
return (target != se) || (transferred != processed);
},
() -> {
Time.advanceTimeSecs(1);
try {
Thread.sleep(rand.nextInt(200));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
lastSpoutCommit.set(target);
}
/**
* Read a metric from the tracked cluster (NOT JUST THIS TOPOLOGY).
* @param key one of "spout-emitted", "processed", or "transferred"
* @return the amount of that metric
*/
@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
public int globalAmt(String key) {
return globalAmt(cluster.getTrackedId(), key);
}
}