blob: 734462e4fece2a461815f3228e2cf6e64e9b0ce9 [file] [log] [blame]
package org.apache.samoa.topology.impl;
/*
* #%L
* SAMOA
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.s4.base.KeyFinder;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.topology.AbstractStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* S4 Platform specific stream.
*
* @author severien
*
*/
public class S4Stream extends AbstractStream {
public static final int SHUFFLE = 0;
public static final int GROUP_BY_KEY = 1;
public static final int BROADCAST = 2;
private static final Logger logger = LoggerFactory.getLogger(S4Stream.class);
private S4DoTask app;
private int processingItemParalellism;
private int shuffleCounter;
private static final String NAME = "STREAM-";
private static int OBJ_COUNTER = 0;
/* The stream list */
public List<StreamType> streams;
public S4Stream(S4DoTask app) {
super();
this.app = app;
this.processingItemParalellism = 1;
this.shuffleCounter = 0;
this.streams = new ArrayList<StreamType>();
this.setStreamId(NAME + OBJ_COUNTER);
OBJ_COUNTER++;
}
public S4Stream(S4DoTask app, S4ProcessingItem pi) {
super();
this.app = app;
this.processingItemParalellism = 1;
this.shuffleCounter = 0;
this.streams = new ArrayList<StreamType>();
this.setStreamId(NAME + OBJ_COUNTER);
OBJ_COUNTER++;
}
/**
*
* @return
*/
public int getParallelism() {
return processingItemParalellism;
}
public void setParallelism(int parallelism) {
this.processingItemParalellism = parallelism;
}
public void addStream(String streamID, KeyFinder<S4Event> finder,
S4ProcessingItem pi, int type) {
String streamName = streamID + "_" + pi.getName();
org.apache.s4.core.Stream<S4Event> stream = this.app.createStream(
streamName, pi);
stream.setName(streamName);
logger.debug("Stream name S4Stream {}", streamName);
if (finder != null)
stream.setKey(finder);
this.streams.add(new StreamType(stream, type));
}
@Override
public void put(ContentEvent event) {
for (int i = 0; i < streams.size(); i++) {
switch (streams.get(i).getType()) {
case SHUFFLE:
S4Event s4event = new S4Event(event);
s4event.setStreamId(streams.get(i).getStream().getName());
if (getParallelism() == 1) {
s4event.setKey("0");
} else {
s4event.setKey(Integer.toString(shuffleCounter));
}
streams.get(i).getStream().put(s4event);
shuffleCounter++;
if (shuffleCounter >= (getParallelism())) {
shuffleCounter = 0;
}
break;
case GROUP_BY_KEY:
S4Event s4event1 = new S4Event(event);
s4event1.setStreamId(streams.get(i).getStream().getName());
HashCodeBuilder hb = new HashCodeBuilder();
hb.append(event.getKey());
String key = Integer.toString(hb.build() % getParallelism());
s4event1.setKey(key);
streams.get(i).getStream().put(s4event1);
break;
case BROADCAST:
for (int p = 0; p < this.getParallelism(); p++) {
S4Event s4event2 = new S4Event(event);
s4event2.setStreamId(streams.get(i).getStream().getName());
s4event2.setKey(Integer.toString(p));
streams.get(i).getStream().put(s4event2);
}
break;
default:
break;
}
}
}
/**
* Subclass for definig stream connection type
*
* @author severien
*
*/
class StreamType {
org.apache.s4.core.Stream<S4Event> stream;
int type;
public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
this.stream = s;
this.type = t;
}
public org.apache.s4.core.Stream<S4Event> getStream() {
return stream;
}
public void setStream(org.apache.s4.core.Stream<S4Event> stream) {
this.stream = stream;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
}
}