blob: 771cbc894306c44e83096f279a246abf3d0253f2 [file] [log] [blame]
package org.apache.samoa.topology.impl;
/*
* #%L
* SAMOA
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.EntranceProcessor;
import org.apache.samoa.topology.EntranceProcessingItem;
import org.apache.samoa.topology.Stream;
// TODO adapt this entrance processing item to connect to external streams so the application doesnt need to use an AdapterApp
public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem {
private EntranceProcessor entranceProcessor;
// private S4DoTask app;
private int parallelism;
protected Stream outputStream;
/**
* Constructor of an S4 entrance processing item.
*
* @param app
* : S4 application
*/
public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) {
super(app);
this.entranceProcessor = entranceProcessor;
// this.app = (S4DoTask) app;
// this.setSingleton(true);
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public int getParallelism() {
return this.parallelism;
}
@Override
public EntranceProcessor getProcessor() {
return this.entranceProcessor;
}
//
// @Override
// public void put(Instance inst) {
// // do nothing
// // may not needed
// }
@Override
protected void onCreate() {
// was commented
if (this.entranceProcessor != null) {
// TODO revisit if we need to change it to a clone() call
this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor);
this.entranceProcessor.onCreate(Integer.parseInt(getId()));
}
}
@Override
protected void onRemove() {
// do nothing
}
//
// /**
// * Sets the entrance processing item processor.
// *
// * @param processor
// */
// public void setProcessor(Processor processor) {
// this.entranceProcessor = processor;
// }
@Override
public void setName(String name) {
super.setName(name);
}
@Override
public EntranceProcessingItem setOutputStream(Stream stream) {
if (this.outputStream != null)
throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once");
this.outputStream = stream;
return this;
}
public boolean injectNextEvent() {
if (entranceProcessor.hasNext()) {
ContentEvent nextEvent = this.entranceProcessor.nextEvent();
outputStream.put(nextEvent);
return entranceProcessor.hasNext();
} else
return false;
// return !nextEvent.isLastEvent();
}
}