blob: 90613c95c7f3a69df16a283d61b03a50905e6cec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.flux;
import static org.hamcrest.CoreMatchers.is;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.flux.model.ExecutionContext;
import org.apache.storm.flux.model.TopologyDef;
import org.apache.storm.flux.parser.FluxParser;
import org.apache.storm.flux.test.TestBolt;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.Collections;
import java.util.Properties;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
public class TCKTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testTCK() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testShellComponents() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testBadShellComponents() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_shell_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Unable to find configuration method");
FluxBuilder.buildTopology(context);
}
@Test
public void testKafkaSpoutConfig() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testLoadFromResource() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testHdfs() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testDiamondTopology() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/diamond-topology.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testHbase() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testBadHbase() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Couldn't find a suitable constructor");
FluxBuilder.buildTopology(context);
}
@Test
public void testIncludes() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
assertTrue(topologyDef.getName().equals("include-topology"));
assertTrue(topologyDef.getBolts().size() > 0);
assertTrue(topologyDef.getSpouts().size() > 0);
topology.validate();
}
@Test
public void testTopologySource() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testTopologySourceWithReflection() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testTopologySourceWithConfigParam() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testTopologySourceWithMethodName() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testTridentTopologySource() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidTopologySource() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false);
assertFalse("Topology config is invalid.", topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
}
@Test
public void testTopologySourceWithGetMethodName() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
}
@Test
public void testTopologySourceWithConfigMethods() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
// make sure the property was actually set
TestBolt bolt = (TestBolt)context.getBolt("bolt-1");
assertTrue(bolt.getFoo().equals("foo"));
assertTrue(bolt.getBar().equals("bar"));
assertTrue(bolt.getFooBar().equals("foobar"));
assertNotNull(context.getBolt("bolt-2"));
assertNotNull(context.getBolt("bolt-3"));
assertNotNull(context.getBolt("bolt-4"));
assertArrayEquals(new TestBolt.TestClass[] {new TestBolt.TestClass("foo"), new TestBolt.TestClass("bar"), new TestBolt.TestClass("baz")}, bolt.getClasses());
}
@Test
public void testVariableSubstitution() throws Exception {
Properties properties = FluxParser.parseProperties("/configs/test.properties", true);
TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, properties, true);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
assertNotNull(topology);
topology.validate();
// test basic substitution
assertEquals("Property not replaced.",
"substitution-topology",
context.getTopologyDef().getName());
// test environment variable substitution
// $PATH should be defined on most systems
String envPath = System.getenv().get("PATH");
assertEquals("ENV variable not replaced.",
envPath,
context.getTopologyDef().getConfig().get("test.env.value"));
//Test substitution where the target type is List
assertThat("List property is not replaced by the expected value",
Collections.singletonList("A string list"),
is(context.getTopologyDef().getConfig().get("list.property.target")));
}
@Test
public void testTopologyWithInvalidStaticFactoryArgument() throws Exception {
//STORM-3087.
TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_static_factory_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Couldn't find a suitable static method");
FluxBuilder.buildTopology(context);
}
}