blob: 7f0996f866ee5ea009d797c376318beaca5fdc70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samoa.topology.impl;
import java.io.Serializable;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.Processor;
import org.apache.samoa.topology.AbstractProcessingItem;
import org.apache.samoa.topology.ProcessingItem;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream;
import org.apache.samoa.utils.PartitioningScheme;
import org.apache.samoa.utils.SamzaConfigFactory;
import org.apache.samoa.utils.StreamDestination;
import org.apache.samoa.utils.SystemsUtils;
import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
/**
* ProcessingItem for Samza which is also a Samza task (StreamTask and InitableTask)
*
* @author Anh Thu Vu
*/
public class SamzaProcessingItem extends AbstractProcessingItem
implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
/**
*
*/
private static final long serialVersionUID = 1L;
private Set<SamzaSystemStream> inputStreams; // input streams: system.stream
private List<SamzaStream> outputStreams;
/*
* Constructors
*/
// Need this so Samza can initialize a StreamTask
public SamzaProcessingItem() {
}
/*
* Implement org.apache.samoa.topology.ProcessingItem
*/
public SamzaProcessingItem(Processor processor, int parallelismHint) {
super(processor, parallelismHint);
this.inputStreams = new HashSet<SamzaSystemStream>();
this.outputStreams = new LinkedList<SamzaStream>();
}
/*
* Simple setters, getters
*/
public Set<SamzaSystemStream> getInputStreams() {
return this.inputStreams;
}
/*
* Extends AbstractProcessingItem
*/
@Override
protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) {
SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new StreamDestination(this, this
.getParallelism(), scheme));
this.inputStreams.add(stream);
return this;
}
/*
* Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode
*/
@Override
public int addOutputStream(SamzaStream stream) {
this.outputStreams.add(stream);
return this.outputStreams.size();
}
public List<SamzaStream> getOutputStreams() {
return this.outputStreams;
}
/*
* Implement Samza task
*/
@Override
public void init(Config config, TaskContext context) throws Exception {
String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property is set, otherwise, assume we are running in local mode and ignore this // set , otherwise,
SystemsUtils.setHadoopConfigHome(yarnConfHome);
String filename = config.get(SamzaConfigFactory.FILE_KEY);
String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
SerializationProxy wrapper = (SerializationProxy) SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
filename, this.getName());
this.setProcessor(wrapper.processor);
this.outputStreams = wrapper.outputStreams;
// Init Processor and Streams
this.getProcessor().onCreate(0);
for (SamzaStream stream : this.outputStreams) {
stream.onCreate();
}
}
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
throws Exception {
for (SamzaStream stream : this.outputStreams) {
stream.setCollector(collector);
}
this.getProcessor().process((ContentEvent) envelope.getMessage());
}
/*
* SerializationProxy
*/
private Object writeReplace() {
return new SerializationProxy(this);
}
private static class SerializationProxy implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1534643987559070336L;
private Processor processor;
private List<SamzaStream> outputStreams;
public SerializationProxy(SamzaProcessingItem pi) {
this.processor = pi.getProcessor();
this.outputStreams = pi.getOutputStreams();
}
}
}