| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.coordination; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.storm.Constants; |
| import org.apache.storm.coordination.CoordinatedBolt.SourceArgs; |
| import org.apache.storm.generated.GlobalStreamId; |
| import org.apache.storm.generated.Grouping; |
| import org.apache.storm.generated.SharedMemory; |
| import org.apache.storm.grouping.CustomStreamGrouping; |
| import org.apache.storm.grouping.PartialKeyGrouping; |
| import org.apache.storm.topology.BaseConfigurationDeclarer; |
| import org.apache.storm.topology.BasicBoltExecutor; |
| import org.apache.storm.topology.BoltDeclarer; |
| import org.apache.storm.topology.IBasicBolt; |
| import org.apache.storm.topology.IRichBolt; |
| import org.apache.storm.topology.InputDeclarer; |
| import org.apache.storm.topology.TopologyBuilder; |
| import org.apache.storm.tuple.Fields; |
| |
| |
| public class BatchSubtopologyBuilder { |
| Map<String, Component> bolts = new HashMap<>(); |
| Component masterBolt; |
| String masterId; |
| |
| public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) { |
| Integer p = boltParallelism == null ? null : boltParallelism.intValue(); |
| this.masterBolt = new Component(new BasicBoltExecutor(masterBolt), p); |
| masterId = masterBoltId; |
| } |
| |
| public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) { |
| this(masterBoltId, masterBolt, null); |
| } |
| |
| public BoltDeclarer getMasterDeclarer() { |
| return new BoltDeclarerImpl(masterBolt); |
| } |
| |
| public BoltDeclarer setBolt(String id, IBatchBolt bolt) { |
| return setBolt(id, bolt, null); |
| } |
| |
| public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) { |
| return setBolt(id, new BatchBoltExecutor(bolt), parallelism); |
| } |
| |
| public BoltDeclarer setBolt(String id, IBasicBolt bolt) { |
| return setBolt(id, bolt, null); |
| } |
| |
| public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) { |
| return setBolt(id, new BasicBoltExecutor(bolt), parallelism); |
| } |
| |
| private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) { |
| Integer p = null; |
| if (parallelism != null) { |
| p = parallelism.intValue(); |
| } |
| Component component = new Component(bolt, p); |
| bolts.put(id, component); |
| return new BoltDeclarerImpl(component); |
| } |
| |
| public void extendTopology(TopologyBuilder builder) { |
| BoltDeclarer declarer = builder.setBolt(masterId, new CoordinatedBolt(masterBolt.bolt), masterBolt.parallelism); |
| for (InputDeclaration decl : masterBolt.declarations) { |
| decl.declare(declarer); |
| } |
| declarer.addConfigurations(masterBolt.componentConf); |
| for (String id : bolts.keySet()) { |
| Component component = bolts.get(id); |
| Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>(); |
| for (String c : componentBoltSubscriptions(component)) { |
| SourceArgs source; |
| if (c.equals(masterId)) { |
| source = SourceArgs.single(); |
| } else { |
| source = SourceArgs.all(); |
| } |
| coordinatedArgs.put(c, source); |
| } |
| |
| |
| BoltDeclarer input = builder.setBolt(id, |
| new CoordinatedBolt(component.bolt, |
| coordinatedArgs, |
| null), |
| component.parallelism); |
| for (SharedMemory request : component.sharedMemory) { |
| input.addSharedMemory(request); |
| } |
| if (!component.componentConf.isEmpty()) { |
| input.addConfigurations(component.componentConf); |
| } |
| for (String c : componentBoltSubscriptions(component)) { |
| input.directGrouping(c, Constants.COORDINATED_STREAM_ID); |
| } |
| for (InputDeclaration d : component.declarations) { |
| d.declare(input); |
| } |
| } |
| } |
| |
| private Set<String> componentBoltSubscriptions(Component component) { |
| Set<String> ret = new HashSet<>(); |
| for (InputDeclaration d : component.declarations) { |
| ret.add(d.getComponent()); |
| } |
| return ret; |
| } |
| |
| private interface InputDeclaration { |
| void declare(InputDeclarer declarer); |
| |
| String getComponent(); |
| } |
| |
| private static class Component { |
| public final IRichBolt bolt; |
| public final Integer parallelism; |
| public final List<InputDeclaration> declarations = new ArrayList<>(); |
| public final Map<String, Object> componentConf = new HashMap<>(); |
| public final Set<SharedMemory> sharedMemory = new HashSet<>(); |
| |
| Component(IRichBolt bolt, Integer parallelism) { |
| this.bolt = bolt; |
| this.parallelism = parallelism; |
| } |
| } |
| |
| private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer { |
| Component component; |
| |
| BoltDeclarerImpl(Component component) { |
| this.component = component; |
| } |
| |
| @Override |
| public BoltDeclarer fieldsGrouping(final String component, final Fields fields) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.fieldsGrouping(component, fields); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.fieldsGrouping(component, streamId, fields); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer globalGrouping(final String component) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.globalGrouping(component); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer globalGrouping(final String component, final String streamId) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.globalGrouping(component, streamId); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer shuffleGrouping(final String component) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.shuffleGrouping(component); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer shuffleGrouping(final String component, final String streamId) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.shuffleGrouping(component, streamId); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer localOrShuffleGrouping(final String component) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.localOrShuffleGrouping(component); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.localOrShuffleGrouping(component, streamId); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer noneGrouping(final String component) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.noneGrouping(component); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer noneGrouping(final String component, final String streamId) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.noneGrouping(component, streamId); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer allGrouping(final String component) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.allGrouping(component); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer allGrouping(final String component, final String streamId) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.allGrouping(component, streamId); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer directGrouping(final String component) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.directGrouping(component); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer directGrouping(final String component, final String streamId) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.directGrouping(component, streamId); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { |
| return customGrouping(componentId, new PartialKeyGrouping(fields)); |
| } |
| |
| @Override |
| public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { |
| return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); |
| } |
| |
| @Override |
| public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.customGrouping(component, grouping); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.customGrouping(component, streamId, grouping); |
| } |
| |
| @Override |
| public String getComponent() { |
| return component; |
| } |
| }); |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) { |
| addDeclaration(new InputDeclaration() { |
| @Override |
| public void declare(InputDeclarer declarer) { |
| declarer.grouping(stream, grouping); |
| } |
| |
| @Override |
| public String getComponent() { |
| return stream.get_componentId(); |
| } |
| }); |
| return this; |
| } |
| |
| private void addDeclaration(InputDeclaration declaration) { |
| component.declarations.add(declaration); |
| } |
| |
| @Override |
| public BoltDeclarer addConfigurations(Map<String, Object> conf) { |
| if (conf != null) { |
| getComponentConfiguration().putAll(conf); |
| } |
| return this; |
| } |
| |
| @Override |
| public BoltDeclarer addSharedMemory(SharedMemory request) { |
| component.sharedMemory.add(request); |
| return this; |
| } |
| |
| @Override |
| public Map<String, Object> getComponentConfiguration() { |
| return component.componentConf; |
| } |
| } |
| } |