blob: 734be3619ba9ef9635e0b05d6996738931b5a943 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.coordination;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Constants;
import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.PartialKeyGrouping;
import org.apache.storm.topology.BaseConfigurationDeclarer;
import org.apache.storm.topology.BasicBoltExecutor;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.InputDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class BatchSubtopologyBuilder {
Map<String, Component> bolts = new HashMap<>();
Component masterBolt;
String masterId;
public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
Integer p = boltParallelism == null ? null : boltParallelism.intValue();
this.masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
masterId = masterBoltId;
}
public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
this(masterBoltId, masterBolt, null);
}
public BoltDeclarer getMasterDeclarer() {
return new BoltDeclarerImpl(masterBolt);
}
public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
return setBolt(id, bolt, null);
}
public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
}
public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
return setBolt(id, bolt, null);
}
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
}
private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
Integer p = null;
if (parallelism != null) {
p = parallelism.intValue();
}
Component component = new Component(bolt, p);
bolts.put(id, component);
return new BoltDeclarerImpl(component);
}
public void extendTopology(TopologyBuilder builder) {
BoltDeclarer declarer = builder.setBolt(masterId, new CoordinatedBolt(masterBolt.bolt), masterBolt.parallelism);
for (InputDeclaration decl : masterBolt.declarations) {
decl.declare(declarer);
}
declarer.addConfigurations(masterBolt.componentConf);
for (String id : bolts.keySet()) {
Component component = bolts.get(id);
Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
for (String c : componentBoltSubscriptions(component)) {
SourceArgs source;
if (c.equals(masterId)) {
source = SourceArgs.single();
} else {
source = SourceArgs.all();
}
coordinatedArgs.put(c, source);
}
BoltDeclarer input = builder.setBolt(id,
new CoordinatedBolt(component.bolt,
coordinatedArgs,
null),
component.parallelism);
for (SharedMemory request : component.sharedMemory) {
input.addSharedMemory(request);
}
if (!component.componentConf.isEmpty()) {
input.addConfigurations(component.componentConf);
}
for (String c : componentBoltSubscriptions(component)) {
input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
}
for (InputDeclaration d : component.declarations) {
d.declare(input);
}
}
}
private Set<String> componentBoltSubscriptions(Component component) {
Set<String> ret = new HashSet<>();
for (InputDeclaration d : component.declarations) {
ret.add(d.getComponent());
}
return ret;
}
private interface InputDeclaration {
void declare(InputDeclarer declarer);
String getComponent();
}
private static class Component {
public final IRichBolt bolt;
public final Integer parallelism;
public final List<InputDeclaration> declarations = new ArrayList<>();
public final Map<String, Object> componentConf = new HashMap<>();
public final Set<SharedMemory> sharedMemory = new HashSet<>();
Component(IRichBolt bolt, Integer parallelism) {
this.bolt = bolt;
this.parallelism = parallelism;
}
}
private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
Component component;
BoltDeclarerImpl(Component component) {
this.component = component;
}
@Override
public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.fieldsGrouping(component, fields);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.fieldsGrouping(component, streamId, fields);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer globalGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.globalGrouping(component);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer globalGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.globalGrouping(component, streamId);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer shuffleGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.shuffleGrouping(component);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.shuffleGrouping(component, streamId);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer localOrShuffleGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component, streamId);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer noneGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.noneGrouping(component);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer noneGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.noneGrouping(component, streamId);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer allGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.allGrouping(component);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer allGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.allGrouping(component, streamId);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer directGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.directGrouping(component);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer directGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.directGrouping(component, streamId);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
return customGrouping(componentId, new PartialKeyGrouping(fields));
}
@Override
public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
}
@Override
public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.customGrouping(component, grouping);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.customGrouping(component, streamId, grouping);
}
@Override
public String getComponent() {
return component;
}
});
return this;
}
@Override
public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.grouping(stream, grouping);
}
@Override
public String getComponent() {
return stream.get_componentId();
}
});
return this;
}
private void addDeclaration(InputDeclaration declaration) {
component.declarations.add(declaration);
}
@Override
public BoltDeclarer addConfigurations(Map<String, Object> conf) {
if (conf != null) {
getComponentConfiguration().putAll(conf);
}
return this;
}
@Override
public BoltDeclarer addSharedMemory(SharedMemory request) {
component.sharedMemory.add(request);
return this;
}
@Override
public Map<String, Object> getComponentConfiguration() {
return component.componentConf;
}
}
}