| // Copyright 2017 Twitter. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.twitter.heron.eco.parser; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.InputStream; |
| import java.util.List; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import com.twitter.heron.eco.definition.BeanDefinition; |
| import com.twitter.heron.eco.definition.BeanReference; |
| import com.twitter.heron.eco.definition.BoltDefinition; |
| import com.twitter.heron.eco.definition.EcoTopologyDefinition; |
| import com.twitter.heron.eco.definition.GroupingDefinition; |
| import com.twitter.heron.eco.definition.PropertyDefinition; |
| import com.twitter.heron.eco.definition.StreamDefinition; |
| |
| import static junit.framework.TestCase.assertNotNull; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| |
| |
| /** |
| * Unit tests for {@link EcoParser} |
| */ |
| public class EcoParserTest { |
| |
| |
| private static final String BOLT_1 = "bolt-1"; |
| private static final String BOLT_2 = "bolt-2"; |
| private static final String YAML_NO_CONFIG_STR = "# Licensed to the Apache Software Foundation" |
| + " (ASF) under one\n" |
| + "# or more contributor license agreements. See the NOTICE file\n" |
| + "# distributed with this work for additional information\n" |
| + "# regarding copyright ownership. The ASF licenses this file\n" |
| + "# to you under the Apache License, Version 2.0 (the\n" |
| + "# \"License\"); you may not use this file except in compliance\n" |
| + "# with the License. You may obtain a copy of the License at\n" |
| + "#\n" |
| + "# http://www.apache.org/licenses/LICENSE-2.0\n" |
| + "#\n" |
| + "# Unless required by applicable law or agreed to in writing, software\n" |
| + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" |
| + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" |
| + "# See the License for the specific language governing permissions and\n" |
| + "# limitations under the License.\n" |
| + "\n" |
| + "---\n" |
| + "\n" |
| + "# topology definition\n" |
| + "# name to be used when submitting\n" |
| + "name: \"yaml-topology\"\n" |
| + "\n" |
| + "# topology configuration\n" |
| + "# this will be passed to the submitter as a map of config options\n" |
| + "#\n" |
| + "# spout definitions\n" |
| + "spouts:\n" |
| + " - id: \"spout-1\"\n" |
| + " className: \"com.twitter.heron.sample.TestWordSpout\"\n" |
| + " parallelism: 1\n" |
| + "\n" |
| + "# bolt definitions\n" |
| + "bolts:\n" |
| + " - id: \"bolt-1\"\n" |
| + " className: \"com.twitter.heron.sample.TestWordCounter\"\n" |
| + " parallelism: 2\n" |
| + "\n" |
| + " - id: \"bolt-2\"\n" |
| + " className: \"com.twitter.heron.sample.LogInfoBolt\"\n" |
| + " parallelism: 1\n" |
| + "\n" |
| + "#stream definitions\n" |
| + "# stream definitions define connections between spouts and bolts.\n" |
| + "# note that such connections can be cyclical\n" |
| + "streams:\n" |
| + " - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n" |
| + " id: \"connection-1\"\n" |
| + " from: \"spout-1\"\n" |
| + " to: \"bolt-1\"\n" |
| + " grouping:\n" |
| + " type: FIELDS\n" |
| + " args: [\"word\"]\n" |
| + "\n" |
| + " - name: \"bolt-1 --> bolt2\"\n" |
| + " id: \"connection-2\"\n" |
| + " from: \"bolt-1\"\n" |
| + " to: \"bolt-2\"\n" |
| + " grouping:\n" |
| + " type: SHUFFLE"; |
| private static final String YAML_STR = "# Licensed to the Apache Software Foundation" |
| + " (ASF) under one\n" |
| + "# or more contributor license agreements. See the NOTICE file\n" |
| + "# distributed with this work for additional information\n" |
| + "# regarding copyright ownership. The ASF licenses this file\n" |
| + "# to you under the Apache License, Version 2.0 (the\n" |
| + "# \"License\"); you may not use this file except in compliance\n" |
| + "# with the License. You may obtain a copy of the License at\n" |
| + "#\n" |
| + "# http://www.apache.org/licenses/LICENSE-2.0\n" |
| + "#\n" |
| + "# Unless required by applicable law or agreed to in writing, software\n" |
| + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" |
| + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" |
| + "# See the License for the specific language governing permissions and\n" |
| + "# limitations under the License.\n" |
| + "\n" |
| + "---\n" |
| + "\n" |
| + "# topology definition\n" |
| + "# name to be used when submitting\n" |
| + "name: \"yaml-topology\"\n" |
| + "\n" |
| + "# topology configuration\n" |
| + "# this will be passed to the submitter as a map of config options\n" |
| + "#\n" |
| + "config:\n" |
| + " topology.workers: 1\n" |
| + "\n" |
| + "# spout definitions\n" |
| + "spouts:\n" |
| + " - id: \"spout-1\"\n" |
| + " className: \"com.twitter.heron.sample.TestWordSpout\"\n" |
| + " parallelism: 1\n" |
| + "\n" |
| + "# bolt definitions\n" |
| + "bolts:\n" |
| + " - id: \"bolt-1\"\n" |
| + " className: \"com.twitter.heron.sample.TestWordCounter\"\n" |
| + " parallelism: 2\n" |
| + "\n" |
| + " - id: \"bolt-2\"\n" |
| + " className: \"com.twitter.heron.sample.LogInfoBolt\"\n" |
| + " parallelism: 1\n" |
| + "\n" |
| + "#stream definitions\n" |
| + "# stream definitions define connections between spouts and bolts.\n" |
| + "# note that such connections can be cyclical\n" |
| + "streams:\n" |
| + " - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n" |
| + " id: \"connection-1\"\n" |
| + " from: \"spout-1\"\n" |
| + " to: \"bolt-1\"\n" |
| + " grouping:\n" |
| + " type: FIELDS\n" |
| + " args: [\"word\"]\n" |
| + "\n" |
| + " - name: \"bolt-1 --> bolt2\"\n" |
| + " id: \"connection-2\"\n" |
| + " from: \"bolt-1\"\n" |
| + " to: \"bolt-2\"\n" |
| + " grouping:\n" |
| + " type: SHUFFLE"; |
| |
| private static final String YAML_STR_1 = "# Test ability to wire together shell spouts/bolts\n" |
| + "---\n" |
| + "\n" |
| + "name: \"kafka-topology\"\n" |
| + "\n" |
| + "# Components\n" |
| + "# Components are analagous to Spring beans. They are meant to be used as constructor,\n" |
| + "# property(setter), and builder arguments.\n" |
| + "#\n" |
| + "# for the time being, components must be declared in the order they are referenced\n" |
| + "components:\n" |
| + " - id: \"stringScheme\"\n" |
| + " className: \"org.apache.storm.kafka.StringScheme\"\n" |
| + "\n" |
| + " - id: \"stringMultiScheme\"\n" |
| + " className: \"org.apache.storm.spout.SchemeAsMultiScheme\"\n" |
| + " constructorArgs:\n" |
| + " - ref: \"stringScheme\"\n" |
| + "\n" |
| + " - id: \"zkHosts\"\n" |
| + " className: \"org.apache.storm.kafka.ZkHosts\"\n" |
| + " constructorArgs:\n" |
| + " - \"localhost:2181\"\n" |
| + "\n" |
| + "# Alternative kafka config\n" |
| + "# - id: \"kafkaConfig\"\n" |
| + "# className: \"org.apache.storm.kafka.KafkaConfig\"\n" |
| + "# constructorArgs:\n" |
| + "# # brokerHosts\n" |
| + "# - ref: \"zkHosts\"\n" |
| + "# # topic\n" |
| + "# - \"myKafkaTopic\"\n" |
| + "# # clientId (optional)\n" |
| + "# - \"myKafkaClientId\"\n" |
| + "\n" |
| + " - id: \"spoutConfig\"\n" |
| + " className: \"org.apache.storm.kafka.SpoutConfig\"\n" |
| + " constructorArgs:\n" |
| + " # brokerHosts\n" |
| + " - ref: \"zkHosts\"\n" |
| + " # topic\n" |
| + " - \"myKafkaTopic\"\n" |
| + " # zkRoot\n" |
| + " - \"/kafkaSpout\"\n" |
| + " # id\n" |
| + " - \"myId\"\n" |
| + " properties:\n" |
| + " - name: \"ignoreZkOffsets\"\n" |
| + " value: true\n" |
| + " - name: \"scheme\"\n" |
| + " ref: \"stringMultiScheme\"\n" |
| + "\n" |
| + "\n" |
| + "\n" |
| + "# topology configuration\n" |
| + "# this will be passed to the submitter as a map of config options\n" |
| + "#\n" |
| + "config:\n" |
| + " topology.workers: 1\n" |
| + " # ...\n" |
| + "\n" |
| + "# spout definitions\n" |
| + "spouts:\n" |
| + " - id: \"kafka-spout\"\n" |
| + " className: \"org.apache.storm.kafka.KafkaSpout\"\n" |
| + " constructorArgs:\n" |
| + " - ref: \"spoutConfig\"\n" |
| + "\n" |
| + "# bolt definitions\n" |
| + "bolts:\n" |
| + " - id: \"splitsentence\"\n" |
| + " className: \"org.apache.storm.flux.wrappers.bolts.FluxShellBolt\"\n" |
| + " constructorArgs:\n" |
| + " # command line\n" |
| + " - [\"python\", \"splitsentence.py\"]\n" |
| + " # output fields\n" |
| + " - [\"word\"]\n" |
| + " parallelism: 1\n" |
| + " # ...\n" |
| + "\n" |
| + " - id: \"log\"\n" |
| + " className: \"org.apache.storm.flux.wrappers.bolts.LogInfoBolt\"\n" |
| + " parallelism: 1\n" |
| + " # ...\n" |
| + "\n" |
| + " - id: \"count\"\n" |
| + " className: \"org.apache.storm.testing.TestWordCounter\"\n" |
| + " parallelism: 1\n" |
| + " # ...\n" |
| + "\n" |
| + "#stream definitions\n" |
| + "# stream definitions define connections between spouts and bolts.\n" |
| + "# note that such connections can be cyclical\n" |
| + "# custom stream groupings are also supported\n" |
| + "\n" |
| + "streams:\n" |
| + " - name: \"kafka --> split\" # name isn't used (placeholder for logging, UI, etc.)\n" |
| + " id: \"stream1\"\n" |
| + " from: \"kafka-spout\"\n" |
| + " to: \"splitsentence\"\n" |
| + " grouping:\n" |
| + " type: SHUFFLE\n" |
| + "\n" |
| + " - name: \"split --> count\"\n" |
| + " id: \"stream2\"\n" |
| + " from: \"splitsentence\"\n" |
| + " to: \"count\"\n" |
| + " grouping:\n" |
| + " type: FIELDS\n" |
| + " args: [\"word\"]\n" |
| + "\n" |
| + " - name: \"count --> log\"\n" |
| + " id: \"stream3\"\n" |
| + " from: \"count\"\n" |
| + " to: \"log\"\n" |
| + " grouping:\n" |
| + " type: SHUFFLE"; |
| private EcoParser subject; |
| |
| @Before |
| public void setUpBeforeEachTestCase() { |
| subject = new EcoParser(); |
| } |
| |
| |
| @Test |
| public void testParseFromInputStream_VerifyComponents_MapsAsExpected() throws Exception { |
| |
| InputStream inputStream = new ByteArrayInputStream(YAML_STR_1.getBytes()); |
| |
| EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream); |
| List<BeanDefinition> components = topologyDefinition.getComponents(); |
| |
| assertEquals("kafka-topology", topologyDefinition.getName()); |
| assertEquals(4, components.size()); |
| |
| BeanDefinition stringSchemeComponent = components.get(0); |
| assertEquals("stringScheme", stringSchemeComponent.getId()); |
| assertEquals("org.apache.storm.kafka.StringScheme", stringSchemeComponent.getClassName()); |
| |
| |
| BeanDefinition stringMultiSchemeComponent = components.get(1); |
| assertEquals("stringMultiScheme", stringMultiSchemeComponent.getId()); |
| assertEquals("org.apache.storm.spout.SchemeAsMultiScheme", |
| stringMultiSchemeComponent.getClassName()); |
| assertEquals(1, stringMultiSchemeComponent.getConstructorArgs().size()); |
| BeanReference multiStringReference = |
| (BeanReference) stringMultiSchemeComponent.getConstructorArgs().get(0); |
| assertEquals("stringScheme", multiStringReference.getId()); |
| |
| BeanDefinition zkHostsComponent = components.get(2); |
| assertEquals("zkHosts", zkHostsComponent.getId()); |
| assertEquals("org.apache.storm.kafka.ZkHosts", zkHostsComponent.getClassName()); |
| assertEquals(1, zkHostsComponent.getConstructorArgs().size()); |
| assertEquals("localhost:2181", zkHostsComponent.getConstructorArgs().get(0)); |
| |
| BeanDefinition spoutConfigComponent = components.get(3); |
| List<Object> spoutConstructArgs = spoutConfigComponent.getConstructorArgs(); |
| assertEquals("spoutConfig", spoutConfigComponent.getId()); |
| assertEquals("org.apache.storm.kafka.SpoutConfig", spoutConfigComponent.getClassName()); |
| BeanReference spoutBrokerHostComponent = (BeanReference) spoutConstructArgs.get(0); |
| assertEquals("zkHosts", spoutBrokerHostComponent.getId()); |
| assertEquals("myKafkaTopic", spoutConstructArgs.get(1)); |
| assertEquals("/kafkaSpout", spoutConstructArgs.get(2)); |
| List<PropertyDefinition> properties = spoutConfigComponent.getProperties(); |
| assertEquals("ignoreZkOffsets", properties.get(0).getName()); |
| assertEquals(true, properties.get(0).getValue()); |
| assertEquals("scheme", properties.get(1).getName()); |
| assertEquals(true, properties.get(1).isReference()); |
| assertEquals("stringMultiScheme", properties.get(1).getRef()); |
| } |
| |
| @Test |
| public void testParseFromInputStream_VerifyAllButComponents_MapsAsExpected() throws Exception { |
| |
| InputStream inputStream = new ByteArrayInputStream(YAML_STR.getBytes()); |
| |
| EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream); |
| |
| assertEquals("yaml-topology", topologyDefinition.getName()); |
| assertEquals(1, topologyDefinition.getConfig().size()); |
| assertEquals(1, topologyDefinition.getConfig().get("topology.workers")); |
| |
| BoltDefinition bolt1 = topologyDefinition.getBolt(BOLT_1); |
| assertNotNull(bolt1); |
| assertEquals(2, bolt1.getParallelism()); |
| assertEquals("com.twitter.heron.sample.TestWordCounter", bolt1.getClassName()); |
| assertEquals(BOLT_1, bolt1.getId()); |
| |
| |
| BoltDefinition bolt2 = topologyDefinition.getBolt(BOLT_2); |
| assertEquals(1, bolt2.getParallelism()); |
| assertEquals("com.twitter.heron.sample.LogInfoBolt", bolt2.getClassName()); |
| assertEquals(BOLT_2, bolt2.getId()); |
| |
| List<StreamDefinition> streamDefinitions = topologyDefinition.getStreams(); |
| StreamDefinition streamDefinitionOne = streamDefinitions.get(0); |
| GroupingDefinition groupingDefinitionOne = streamDefinitionOne.getGrouping(); |
| StreamDefinition streamDefinitionTwo = streamDefinitions.get(1); |
| GroupingDefinition groupingDefinitionTwo = streamDefinitionTwo.getGrouping(); |
| |
| assertEquals(2, streamDefinitions.size()); |
| |
| assertEquals(BOLT_1, streamDefinitionOne.getTo()); |
| assertEquals("spout-1", streamDefinitionOne.getFrom()); |
| assertEquals(GroupingDefinition.Type.FIELDS, groupingDefinitionOne.getType()); |
| assertEquals(1, groupingDefinitionOne.getArgs().size()); |
| assertEquals("word", groupingDefinitionOne.getArgs().get(0)); |
| assertEquals("connection-1", streamDefinitionOne.getId()); |
| |
| assertEquals(BOLT_2, streamDefinitionTwo.getTo()); |
| assertEquals("bolt-1", streamDefinitionTwo.getFrom()); |
| assertEquals(GroupingDefinition.Type.SHUFFLE, groupingDefinitionTwo.getType()); |
| assertEquals("connection-2", streamDefinitionTwo.getId()); |
| assertNull(groupingDefinitionTwo.getArgs()); |
| |
| } |
| |
| @Test |
| public void testPartFromInputStream_NoConfigSpecified_ConfigMapIsEmpty() throws Exception { |
| InputStream inputStream = new ByteArrayInputStream(YAML_NO_CONFIG_STR.getBytes()); |
| |
| EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream); |
| |
| assertNotNull(topologyDefinition.getConfig()); |
| assertEquals(0, topologyDefinition.getConfig().size()); |
| } |
| |
| @Test(expected = Exception.class) |
| public void testParseFromInputStream_StreamIsNull_ExceptionThrown() throws Exception { |
| InputStream inputStream = null; |
| EcoTopologyDefinition ecoTopologyDefinition = null; |
| |
| try { |
| ecoTopologyDefinition = subject.parseFromInputStream(inputStream); |
| } finally { |
| assertNull(ecoTopologyDefinition); |
| } |
| } |
| } |