Merge pull request #8783 from GGraziadei/8710-flux-component-conf
Allow per-component configuration in Flux topology
diff --git a/docs/Serialization.md b/docs/Serialization.md
index e7af574..8f87ba6 100644
--- a/docs/Serialization.md
+++ b/docs/Serialization.md
@@ -90,9 +90,24 @@
#### Flux
-> **Note:** With [Flux](flux.html), only **topology-wide** enablement is currently possible. Flux has no per-component configuration mechanism — `FluxBuilder` applies only parallelism, number of tasks, memory/CPU load, and groupings to the underlying declarers, and the `config:` block is topology-scoped. There is no Flux equivalent of `declarer.addConfiguration(...)`, so the per-component approach recommended above cannot be expressed in a Flux YAML definition.
+[Flux](flux.html) supports per-component configuration. In addition to parallelism, number of tasks, memory/CPU load, and groupings, each spout and bolt definition accepts a `config:` block that `FluxBuilder` applies to the underlying declarer via `addConfigurations(...)`. This is the Flux equivalent of `declarer.addConfiguration(...)`, so you can enable compression for just the components that emit large tuples:
-To enable compression for a Flux topology, set it in the topology-level `config:` block:
+```yaml
+spouts:
+ - id: "file-read-spout"
+ className: "org.apache.storm.perf.spout.FileReadSpout"
+ parallelism: 1
+ # enable compression for this spout only
+ config:
+ topology.tuple.compression.enable: true
+
+bolts:
+ - id: "split"
+ className: "org.apache.storm.perf.bolt.SplitSentenceBolt"
+ parallelism: 1
+```
+
+You can also enable it topology-wide by setting it in the topology-level `config:`
```yaml
config:
@@ -100,7 +115,7 @@
topology.tuple.compression.threshold: 1460
```
-Be aware that this enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.
+Be aware that the topology-wide form enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.
#### Configuration reference
diff --git a/flux/README.md b/flux/README.md
index 8319256..bf41e3d 100644
--- a/flux/README.md
+++ b/flux/README.md
@@ -574,6 +574,17 @@
Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
well.
+In addition to `parallelism`, spout and bolt definitions support the following optional parameters that map directly to
+the underlying Storm `BoltDeclarer`/`SpoutDeclarer`:
+
+| Parameter | Description |
+|---------------------|------------------------------------------------------------------------------------------|
+| `numTasks` | The number of tasks for the component (`setNumTasks`). |
+| `onHeapMemoryLoad` | The on-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). |
+| `offHeapMemoryLoad` | The off-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). |
+| `cpuLoad` | The CPU load for resource-aware scheduling (`setCPULoad`). |
+| `config` | A map of configuration parameters applied only to this component (`addConfigurations`). |
+
Shell spout example:
```yaml
@@ -656,6 +667,35 @@
parallelism: 1
# ...
```
+
+### Per-Component Configuration
+In addition to the topology-wide [Topology Config](#topology-config), each spout and bolt can declare its own `config`
+map. These configurations are applied to that component only, via the declarer's `addConfigurations(...)` method,
+following Storm's native support for component-level configuration. This avoids enabling a configuration topology-wide
+when only a single component requires it.
+
+```yaml
+spouts:
+ - id: "sentence-spout"
+ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+ parallelism: 1
+ # configuration applied to this spout only
+ config:
+ topology.max.spout.pending: 1000
+
+bolts:
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ # configuration applied to this bolt only
+ config:
+ topology.tuple.compression.enable: true
+```
+
+Known Storm configuration keys are validated when the topology is built, so an invalid value (for example, a
+non-boolean value for `topology.tuple.compression.enable`) fails fast rather than at submission time. Unknown/custom
+keys are not validated and are passed through verbatim; validation runs client-side at build time.
+
## Streams and Stream Groupings
Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
a topology, with an associated Grouping definition.
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index 338994b..b715727 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -48,6 +48,7 @@
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.hooks.IWorkerHook;
import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.ComponentConfigurationDeclarer;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
@@ -57,11 +58,12 @@
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
+import org.apache.storm.validation.ConfigValidation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FluxBuilder {
- private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
/**
@@ -72,7 +74,10 @@
public static Config buildConfig(TopologyDef topologyDef) {
// merge contents of `config` into topology config
Config conf = new Config();
- conf.putAll(topologyDef.getConfig());
+ Map<String, Object> topologyConfig = topologyDef.getConfig();
+ // validate the topology-wide config so invalid values fail fast
+ ConfigValidation.validateFields(topologyConfig);
+ conf.putAll(topologyConfig);
return conf;
}
@@ -185,55 +190,40 @@
for (StreamDef stream : topologyDef.getStreams()) {
Object boltObj = context.getBolt(stream.getTo());
BoltDeclarer declarer = declarers.get(stream.getTo());
- if (boltObj instanceof IRichBolt) {
- if (declarer == null) {
- declarer = builder.setBolt(stream.getTo(),
- (IRichBolt) boltObj,
+ boolean newDeclarer = declarer == null;
+ if (newDeclarer) {
+ declarer = switch (boltObj) {
+ case IRichBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else if (boltObj instanceof IBasicBolt) {
- if (declarer == null) {
- declarer = builder.setBolt(
- stream.getTo(),
- (IBasicBolt) boltObj,
+ case IBasicBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else if (boltObj instanceof IWindowedBolt) {
- if (declarer == null) {
- declarer = builder.setBolt(
- stream.getTo(),
- (IWindowedBolt) boltObj,
+ case IWindowedBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else if (boltObj instanceof IStatefulBolt) {
- if (declarer == null) {
- declarer = builder.setBolt(
- stream.getTo(),
- (IStatefulBolt) boltObj,
+ case IStatefulBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
+ default -> throw new IllegalArgumentException("Class does not appear to be a bolt: "
+ + boltObj.getClass().getName());
+ };
+ // resource and config declarations apply to the bolt as a whole, so only apply them once
+ // when the declarer is first created rather than on every incoming stream
+ BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
+ if (boltDef.getOnHeapMemoryLoad() > -1) {
+ if (boltDef.getOffHeapMemoryLoad() > -1) {
+ declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
+ } else {
+ declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
+ }
}
- } else {
- throw new IllegalArgumentException("Class does not appear to be a bolt: "
- + boltObj.getClass().getName());
- }
+ if (boltDef.getCpuLoad() > -1) {
+ declarer.setCPULoad(boltDef.getCpuLoad());
+ }
+ if (boltDef.getNumTasks() > -1) {
+ declarer.setNumTasks(boltDef.getNumTasks());
+ }
+ applyComponentConfig(boltDef.getConfig(), declarer);
- BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
- if (boltDef.getOnHeapMemoryLoad() > -1) {
- if (boltDef.getOffHeapMemoryLoad() > -1) {
- declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
- } else {
- declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
- }
- }
- if (boltDef.getCpuLoad() > -1) {
- declarer.setCPULoad(boltDef.getCpuLoad());
- }
- if (boltDef.getNumTasks() > -1) {
- declarer.setNumTasks(boltDef.getNumTasks());
+ // persist in declares cache
+ declarers.put(stream.getTo(), declarer);
}
GroupingDef grouping = stream.getGrouping();
@@ -456,6 +446,7 @@
if (sd.getNumTasks() > -1) {
declarer.setNumTasks(sd.getNumTasks());
}
+ applyComponentConfig(sd.getConfig(), declarer);
context.addSpout(sd.getId(), spout);
}
@@ -470,6 +461,14 @@
return (IRichSpout) buildObject(def, context);
}
+ private static void applyComponentConfig(Map<String, Object> config, ComponentConfigurationDeclarer declarer) {
+ if (config == null || config.isEmpty()) {
+ return;
+ }
+ ConfigValidation.validateFields(config);
+ declarer.addConfigurations(config);
+ }
+
/**
* Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
* Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
index cd09d05..1214957 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
@@ -18,6 +18,9 @@
package org.apache.storm.flux.model;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Abstract parent class of component definitions.
* (spouts/bolts)
@@ -30,6 +33,8 @@
private int onHeapMemoryLoad = -1;
private int offHeapMemoryLoad = -1;
private int cpuLoad = -1;
+ // per-component configuration
+ private Map<String, Object> config = new HashMap<>();
public int getParallelism() {
return parallelism;
@@ -70,4 +75,12 @@
public void setCpuLoad(int cpuLoad) {
this.cpuLoad = cpuLoad;
}
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
}
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
index 4f5428e..f3b82dc 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
@@ -17,8 +17,15 @@
*/
package org.apache.storm.flux;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.flux.model.TopologyDef;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FluxBuilderTest {
@@ -29,4 +36,26 @@
assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
}
+
+ @Test
+ public void testBuildConfigAcceptsValidTopologyConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(Config.TOPOLOGY_WORKERS, 4);
+ TopologyDef topologyDef = new TopologyDef();
+ topologyDef.setConfig(config);
+
+ Config result = FluxBuilder.buildConfig(topologyDef);
+ assertEquals(4, result.get(Config.TOPOLOGY_WORKERS));
+ }
+
+ @Test
+ public void testBuildConfigRejectsInvalidTopologyConfig() {
+ // topology.workers must be a positive integer; a String value is invalid
+ Map<String, Object> config = new HashMap<>();
+ config.put(Config.TOPOLOGY_WORKERS, "not-a-number");
+ TopologyDef topologyDef = new TopologyDef();
+ topologyDef.setConfig(config);
+
+ assertThrows(IllegalArgumentException.class, () -> FluxBuilder.buildConfig(topologyDef));
+ }
}
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 2fd755e..2135046 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -23,11 +23,14 @@
import org.apache.storm.flux.model.TopologyDef;
import org.apache.storm.flux.parser.FluxParser;
import org.apache.storm.flux.test.TestBolt;
+import org.apache.storm.shade.net.minidev.json.JSONValue;
+import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
public class TCKTest {
@@ -104,6 +107,97 @@
}
@Test
+ public void testComponentConfig() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-test.yaml",
+ false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+
+ Map<String, Object> spoutConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_spouts().get("spout-1").get_common().get_json_conf());
+ assertEquals(Boolean.TRUE, spoutConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+
+ Map<String, Object> boltConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_bolts().get("bolt-1").get_common().get_json_conf());
+ assertEquals(Boolean.FALSE, boltConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+ }
+
+ @Test
+ public void testComponentConfigOverridesTopologyConfig() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-override-test.yaml",
+ false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+
+ // the topology-level config carries the original value
+ assertEquals(Boolean.TRUE, conf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+
+ // evaluate the effective config the way a worker does: topology config overlaid with the
+ // per-component config
+ Map<String, Object> boltComponentConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_bolts().get("bolt-1").get_common().get_json_conf());
+ Map<String, Object> effectiveBoltConf = Utils.merge(conf, boltComponentConf);
+ assertEquals(Boolean.FALSE, effectiveBoltConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+
+ Map<String, Object> spoutComponentConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_spouts().get("spout-1").get_common().get_json_conf());
+ Map<String, Object> effectiveSpoutConf = Utils.merge(conf, spoutComponentConf);
+ assertEquals(Boolean.TRUE, effectiveSpoutConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+ }
+
+ @Test
+ public void testComponentConfigWithInvalidValue() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-invalid-test.yaml", false,
+ true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+
+ IllegalArgumentException expectedException = assertThrows(IllegalArgumentException.class,
+ () -> FluxBuilder.buildTopology(context));
+ assertTrue(expectedException.getMessage().contains("must be of type"));
+ assertTrue(expectedException.getMessage().contains("Boolean"));
+ }
+
+ @Test
+ public void testComponentConfigWithNotRegisteredKey() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-invalid-key-test.yaml", false,
+ true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ Map<String, Object> boltConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_bolts().get("bolt-1").get_common().get_json_conf());
+ // properties added for custom purposes and persisted in the component conf
+ assertTrue(boltConf.containsKey("MY.INAVLID.KEY"));
+ }
+
+ @Test
+ public void testComponentConfigMissing() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-missing-test.yaml", false,
+ true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+
+ Map<String, Object> spoutConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_spouts().get("spout-1").get_common().get_json_conf());
+ assertTrue(spoutConf == null || !spoutConf.containsKey(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+
+ Map<String, Object> boltConf = (Map<String, Object>) JSONValue.parse(
+ topology.get_bolts().get("bolt-1").get_common().get_json_conf());
+ assertTrue(boltConf == null || !boltConf.containsKey(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE));
+ }
+
+ @Test
public void testBadHbase() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
diff --git a/flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml
new file mode 100644
index 0000000..7275a84
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+name: "component-config-invalid-topology"
+
+config:
+ topology.workers: 1
+
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ config:
+ MY.INAVLID.KEY: "none"
+
+streams:
+ - from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: SHUFFLE
diff --git a/flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml
new file mode 100644
index 0000000..260f501
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+name: "component-config-invalid-topology"
+
+config:
+ topology.workers: 1
+
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ config:
+ topology.tuple.compression.enable: "not-a-boolean"
+
+streams:
+ - from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: SHUFFLE
diff --git a/flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml
new file mode 100644
index 0000000..1ce783a
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+name: "component-config-missing-topology"
+
+config:
+ topology.workers: 1
+
+# neither the spout nor the bolt declares a per-component config block
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+streams:
+ - from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: SHUFFLE
diff --git a/flux/flux-core/src/test/resources/configs/component-config-override-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-override-test.yaml
new file mode 100644
index 0000000..7d08314
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/component-config-override-test.yaml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+name: "component-config-override-topology"
+
+# topology configuration (global scope)
+config:
+ topology.workers: 1
+ topology.tuple.compression.enable: true
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+ # no per-component override: the effective config inherits the topology-level value
+
+# bolt definitions
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ # per-component configuration overrides the topology-level value for this bolt only
+ config:
+ topology.tuple.compression.enable: false
+
+#stream definitions
+streams:
+ - from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: SHUFFLE
diff --git a/flux/flux-core/src/test/resources/configs/component-config-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-test.yaml
new file mode 100644
index 0000000..042e2b5
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/component-config-test.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+name: "component-config-topology"
+
+# topology configuration (global scope)
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+ # per-component configuration applied to this spout only
+ config:
+ topology.tuple.compression.enable: true
+
+# bolt definitions
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ # per-component configuration applied to this bolt only
+ config:
+ topology.tuple.compression.enable: false
+
+#stream definitions
+streams:
+ - from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: SHUFFLE