blob: d472a653f638e0d455cd2d6e863b26443364bc99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samoa.topology.impl;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.topology.AbstractStream;
import org.apache.samoa.topology.IProcessingItem;
import org.apache.samoa.utils.PartitioningScheme;
import org.apache.samoa.utils.StreamDestination;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
/**
* Stream for SAMOA on Samza
*
* @author Anh Thu Vu
*/
public class SamzaStream extends AbstractStream implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final String DEFAULT_SYSTEM_NAME = "kafka";
private List<SamzaSystemStream> systemStreams;
private transient MessageCollector collector;
private String systemName;
/*
* Constructor
*/
public SamzaStream(IProcessingItem sourcePi) {
super(sourcePi);
this.systemName = DEFAULT_SYSTEM_NAME;
// Get name/id for this stream
SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi;
int index = samzaPi.addOutputStream(this);
this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index));
// init list of SamzaSystemStream
systemStreams = new ArrayList<SamzaSystemStream>();
}
/*
* System name (Kafka)
*/
public void setSystemName(String systemName) {
this.systemName = systemName;
for (SamzaSystemStream systemStream : systemStreams) {
systemStream.setSystem(systemName);
}
}
public String getSystemName() {
return this.systemName;
}
/*
* Add the PI to the list of destinations. Return the name of the
* corresponding SystemStream.
*/
public SamzaSystemStream addDestination(StreamDestination destination) {
PartitioningScheme scheme = destination.getPartitioningScheme();
int parallelism = destination.getParallelism();
SamzaSystemStream resultStream = null;
for (int i = 0; i < systemStreams.size(); i++) {
// There is an existing SystemStream that matches the settings.
// Do not create a new one
if (systemStreams.get(i).isSame(scheme, parallelism)) {
resultStream = systemStreams.get(i);
}
}
// No existing SystemStream match the requirement
// Create a new one
if (resultStream == null) {
String topicName = this.getStreamId() + "-" + Integer.toString(systemStreams.size());
resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, parallelism);
systemStreams.add(resultStream);
}
return resultStream;
}
public void setCollector(MessageCollector collector) {
this.collector = collector;
}
public MessageCollector getCollector() {
return this.collector;
}
public void onCreate() {
for (SamzaSystemStream stream : systemStreams) {
stream.initSystemStream();
}
}
/*
* Implement Stream interface
*/
@Override
public void put(ContentEvent event) {
for (SamzaSystemStream stream : systemStreams) {
stream.send(collector, event);
}
}
public List<SamzaSystemStream> getSystemStreams() {
return this.systemStreams;
}
/**
* SamzaSystemStream wrap around a Samza's SystemStream It contains the info to create a Samza stream during the
* constructing process of the topology and will create the actual Samza stream when the topology is submitted
* (invoking initSystemStream())
*
* @author Anh Thu Vu
*/
public static class SamzaSystemStream implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String system;
private String stream;
private PartitioningScheme scheme;
private int parallelism;
private transient SystemStream actualSystemStream = null;
/*
* Constructors
*/
public SamzaSystemStream(String system, String stream, PartitioningScheme scheme, int parallelism) {
this.system = system;
this.stream = stream;
this.scheme = scheme;
this.parallelism = parallelism;
}
public SamzaSystemStream(String system, String stream, PartitioningScheme scheme) {
this(system, stream, scheme, 1);
}
/*
* Setters
*/
public void setSystem(String system) {
this.system = system;
}
/*
* Getters
*/
public String getSystem() {
return this.system;
}
public String getStream() {
return this.stream;
}
public PartitioningScheme getPartitioningScheme() {
return this.scheme;
}
public int getParallelism() {
return this.parallelism;
}
public boolean isSame(PartitioningScheme scheme, int parallelismHint) {
return (this.scheme == scheme && this.parallelism == parallelismHint);
}
/*
* Init the actual Samza stream
*/
public void initSystemStream() {
actualSystemStream = new SystemStream(this.system, this.stream);
}
/*
* Send a ContentEvent
*/
public void send(MessageCollector collector, ContentEvent contentEvent) {
if (actualSystemStream == null)
this.initSystemStream();
switch (this.scheme) {
case SHUFFLE:
this.sendShuffle(collector, contentEvent);
break;
case GROUP_BY_KEY:
this.sendGroupByKey(collector, contentEvent);
break;
case BROADCAST:
this.sendBroadcast(collector, contentEvent);
break;
}
}
/*
* Helpers
*/
private synchronized void sendShuffle(MessageCollector collector, ContentEvent event) {
collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event));
}
private void sendGroupByKey(MessageCollector collector, ContentEvent event) {
collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event));
}
private synchronized void sendBroadcast(MessageCollector collector, ContentEvent event) {
for (int i = 0; i < parallelism; i++) {
collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, null, event));
}
}
}
}