| package com.yahoo.labs.samoa.topology.impl; |
| |
| /* |
| * #%L |
| * SAMOA |
| * %% |
| * Copyright (C) 2013 Yahoo! Inc. |
| * %% |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * #L% |
| */ |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.samza.system.OutgoingMessageEnvelope; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.task.MessageCollector; |
| |
| import com.yahoo.labs.samoa.core.ContentEvent; |
| import com.yahoo.labs.samoa.topology.IProcessingItem; |
| import com.yahoo.labs.samoa.topology.AbstractStream; |
| import com.yahoo.labs.samoa.utils.PartitioningScheme; |
| import com.yahoo.labs.samoa.utils.StreamDestination; |
| |
| /** |
| * Stream for SAMOA on Samza |
| * |
| * @author Anh Thu Vu |
| */ |
| public class SamzaStream extends AbstractStream implements Serializable { |
| |
| /** |
| * |
| */ |
| private static final long serialVersionUID = 1L; |
| |
| private static final String DEFAULT_SYSTEM_NAME = "kafka"; |
| |
| private List<SamzaSystemStream> systemStreams; |
| private transient MessageCollector collector; |
| private String systemName; |
| |
| /* |
| * Constructor |
| */ |
| public SamzaStream(IProcessingItem sourcePi) { |
| super(sourcePi); |
| this.systemName = DEFAULT_SYSTEM_NAME; |
| // Get name/id for this stream |
| SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi; |
| int index = samzaPi.addOutputStream(this); |
| this.setStreamId(samzaPi.getName()+"-"+Integer.toString(index)); |
| // init list of SamzaSystemStream |
| systemStreams = new ArrayList<SamzaSystemStream>(); |
| } |
| |
| /* |
| * System name (Kafka) |
| */ |
| public void setSystemName(String systemName) { |
| this.systemName = systemName; |
| for (SamzaSystemStream systemStream:systemStreams) { |
| systemStream.setSystem(systemName); |
| } |
| } |
| |
| public String getSystemName() { |
| return this.systemName; |
| } |
| |
| /* |
| * Add the PI to the list of destinations. |
| * Return the name of the corresponding SystemStream. |
| */ |
| public SamzaSystemStream addDestination(StreamDestination destination) { |
| PartitioningScheme scheme = destination.getPartitioningScheme(); |
| int parallelism = destination.getParallelism(); |
| |
| SamzaSystemStream resultStream = null; |
| for (int i=0; i<systemStreams.size(); i++) { |
| // There is an existing SystemStream that matches the settings. |
| // Do not create a new one |
| if (systemStreams.get(i).isSame(scheme, parallelism)) { |
| resultStream = systemStreams.get(i); |
| } |
| } |
| |
| // No existing SystemStream match the requirement |
| // Create a new one |
| if (resultStream == null) { |
| String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size()); |
| resultStream = new SamzaSystemStream(this.systemName,topicName,scheme,parallelism); |
| systemStreams.add(resultStream); |
| } |
| |
| return resultStream; |
| } |
| |
| public void setCollector(MessageCollector collector) { |
| this.collector = collector; |
| } |
| |
| public MessageCollector getCollector(){ |
| return this.collector; |
| } |
| |
| public void onCreate() { |
| for (SamzaSystemStream stream:systemStreams) { |
| stream.initSystemStream(); |
| } |
| } |
| |
| /* |
| * Implement Stream interface |
| */ |
| @Override |
| public void put(ContentEvent event) { |
| for (SamzaSystemStream stream:systemStreams) { |
| stream.send(collector,event); |
| } |
| } |
| |
| public List<SamzaSystemStream> getSystemStreams() { |
| return this.systemStreams; |
| } |
| |
| /** |
| * SamzaSystemStream wrap around a Samza's SystemStream |
| * It contains the info to create a Samza stream during the |
| * constructing process of the topology and |
| * will create the actual Samza stream when the topology is submitted |
| * (invoking initSystemStream()) |
| * |
| * @author Anh Thu Vu |
| */ |
| public static class SamzaSystemStream implements Serializable { |
| /** |
| * |
| */ |
| private static final long serialVersionUID = 1L; |
| private String system; |
| private String stream; |
| private PartitioningScheme scheme; |
| private int parallelism; |
| |
| private transient SystemStream actualSystemStream = null; |
| |
| /* |
| * Constructors |
| */ |
| public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) { |
| this.system = system; |
| this.stream = stream; |
| this.scheme = scheme; |
| this.parallelism = parallelism; |
| } |
| |
| public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) { |
| this(system, stream, scheme, 1); |
| } |
| |
| /* |
| * Setters |
| */ |
| public void setSystem(String system) { |
| this.system = system; |
| } |
| |
| /* |
| * Getters |
| */ |
| public String getSystem() { |
| return this.system; |
| } |
| |
| public String getStream() { |
| return this.stream; |
| } |
| |
| public PartitioningScheme getPartitioningScheme() { |
| return this.scheme; |
| } |
| |
| public int getParallelism() { |
| return this.parallelism; |
| } |
| |
| public boolean isSame(PartitioningScheme scheme, int parallelismHint) { |
| return (this.scheme == scheme && this.parallelism == parallelismHint); |
| } |
| |
| /* |
| * Init the actual Samza stream |
| */ |
| public void initSystemStream() { |
| actualSystemStream = new SystemStream(this.system, this.stream); |
| } |
| |
| /* |
| * Send a ContentEvent |
| */ |
| public void send(MessageCollector collector, ContentEvent contentEvent) { |
| if (actualSystemStream == null) |
| this.initSystemStream(); |
| |
| switch(this.scheme) { |
| case SHUFFLE: |
| this.sendShuffle(collector, contentEvent); |
| break; |
| case GROUP_BY_KEY: |
| this.sendGroupByKey(collector, contentEvent); |
| break; |
| case BROADCAST: |
| this.sendBroadcast(collector, contentEvent); |
| break; |
| } |
| } |
| |
| /* |
| * Helpers |
| */ |
| private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) { |
| collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event)); |
| } |
| |
| private void sendGroupByKey(MessageCollector collector, ContentEvent event) { |
| collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event)); |
| } |
| |
| private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) { |
| for (int i=0; i<parallelism; i++) { |
| collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event)); |
| } |
| } |
| } |
| } |