blob: 2d786b199d79017d680a114ce0c48a98a28b189b [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.api.topology;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.google.protobuf.ByteString;
import com.twitter.heron.api.bolt.IRichBolt;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.grouping.CustomStreamGrouping;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.utils.Utils;
public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
private OutputFieldsGetter output;
private List<TopologyAPI.InputStream.Builder> inputs;
public BoltDeclarer(String name, IRichBolt bolt, Number taskParallelism) {
super(name, bolt, taskParallelism);
inputs = new LinkedList<TopologyAPI.InputStream.Builder>();
output = new OutputFieldsGetter();
bolt.declareOutputFields(output);
}
@Override
public BoltDeclarer returnThis() {
return this;
}
public void dump(TopologyAPI.Topology.Builder bldr) {
TopologyAPI.Bolt.Builder boltBldr = TopologyAPI.Bolt.newBuilder();
TopologyAPI.Component.Builder cbldr = TopologyAPI.Component.newBuilder();
super.dump(cbldr);
boltBldr.setComp(cbldr);
for (TopologyAPI.InputStream.Builder iter : inputs) {
boltBldr.addInputs(iter);
}
Map<String, TopologyAPI.StreamSchema.Builder> outs = output.getFieldsDeclaration();
for (Map.Entry<String, TopologyAPI.StreamSchema.Builder> entry : outs.entrySet()) {
TopologyAPI.OutputStream.Builder obldr = TopologyAPI.OutputStream.newBuilder();
TopologyAPI.StreamId.Builder sbldr = TopologyAPI.StreamId.newBuilder();
sbldr.setId(entry.getKey());
sbldr.setComponentName(getName());
obldr.setStream(sbldr);
obldr.setSchema(entry.getValue());
boltBldr.addOutputs(obldr);
}
bldr.addBolts(boltBldr);
}
public BoltDeclarer fieldsGrouping(String componentName, Fields fields) {
return fieldsGrouping(componentName, Utils.DEFAULT_STREAM_ID, fields);
}
public BoltDeclarer fieldsGrouping(String componentName, String streamId, Fields fields) {
TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
bldr.setStream(
TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
bldr.setGtype(TopologyAPI.Grouping.FIELDS);
TopologyAPI.StreamSchema.Builder gfbldr = TopologyAPI.StreamSchema.newBuilder();
for (int i = 0; i < fields.size(); ++i) {
TopologyAPI.StreamSchema.KeyType.Builder ktBldr =
TopologyAPI.StreamSchema.KeyType.newBuilder();
ktBldr.setKey(fields.get(i));
ktBldr.setType(TopologyAPI.Type.OBJECT);
gfbldr.addKeys(ktBldr);
}
bldr.setGroupingFields(gfbldr);
return grouping(bldr);
}
public BoltDeclarer globalGrouping(String componentName) {
return globalGrouping(componentName, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer globalGrouping(String componentName, String streamId) {
TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
bldr.setStream(
TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
bldr.setGtype(TopologyAPI.Grouping.LOWEST);
return grouping(bldr);
}
public BoltDeclarer shuffleGrouping(String componentName) {
return shuffleGrouping(componentName, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer shuffleGrouping(String componentName, String streamId) {
TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
bldr.setStream(
TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
bldr.setGtype(TopologyAPI.Grouping.SHUFFLE);
return grouping(bldr);
}
public BoltDeclarer localOrShuffleGrouping(String componentName) {
return localOrShuffleGrouping(componentName, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer localOrShuffleGrouping(String componentName, String streamId) {
// Heron tasks are process based, thus there's no concept of local(within process)
// shuffling. So we map local grouping strategy to shuffleGrouping
return shuffleGrouping(componentName, streamId);
}
public BoltDeclarer noneGrouping(String componentName) {
return noneGrouping(componentName, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer noneGrouping(String componentName, String streamId) {
TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
bldr.setStream(
TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
bldr.setGtype(TopologyAPI.Grouping.NONE);
return grouping(bldr);
}
public BoltDeclarer allGrouping(String componentName) {
return allGrouping(componentName, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer allGrouping(String componentName, String streamId) {
TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
bldr.setStream(
TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
bldr.setGtype(TopologyAPI.Grouping.ALL);
return grouping(bldr);
}
public BoltDeclarer directGrouping(String componentName) {
return directGrouping(componentName, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer directGrouping(String componentName, String streamId) {
// TODO:- revisit this
throw new RuntimeException("direct Grouping not implemented");
}
public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouping) {
return customGrouping(componentName, Utils.DEFAULT_STREAM_ID, grouping);
}
public BoltDeclarer customGrouping(
String componentName,
String streamId,
CustomStreamGrouping grouping) {
TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
bldr.setStream(
TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
bldr.setGtype(TopologyAPI.Grouping.CUSTOM);
bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
bldr.setCustomGroupingObject(ByteString.copyFrom(Utils.serialize(grouping)));
return grouping(bldr);
}
private BoltDeclarer grouping(TopologyAPI.InputStream.Builder stream) {
inputs.add(stream);
return this;
}
}