blob: b7858c6360c57acd55045e8ef678af43a194de6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.pen.util.ExampleTuple;
public class POStream extends PhysicalOperator {
private static final long serialVersionUID = 2L;
private String executableManagerStr; // String representing ExecutableManager to use
private StreamingCommand command; // Actual command to be run
private Properties properties;
protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
private transient ExecutableManager executableManager; // ExecutableManager to use
private transient boolean initialized = false;
protected transient boolean allInputFromPredecessorConsumed = false;
protected transient boolean allOutputFromBinaryProcessed = false;
/**
* This flag indicates whether streaming is done through fetching. If set,
* {@link FetchLauncher} pulls out the data from the pipeline. Therefore we need to
* skip the case in {@link #getNextTuple()} which is called by map() or reduce() when
* processing the next tuple.
*/
private boolean isFetchable;
public POStream(OperatorKey k, ExecutableManager executableManager,
StreamingCommand command, Properties properties) {
super(k);
this.executableManagerStr = executableManager.getClass().getName();
this.command = command;
this.properties = properties;
// Setup streaming-specific properties
if (command.getShipFiles()) {
parseShipCacheSpecs(command.getShipSpecs(),
properties, "pig.streaming.ship.files");
}
parseShipCacheSpecs(command.getCacheSpecs(),
properties, "pig.streaming.cache.files");
}
private static void parseShipCacheSpecs(List<String> specs,
Properties properties, String property) {
String existingValue = properties.getProperty(property, "");
if (specs == null || specs.size() == 0) {
return;
}
// Setup streaming-specific properties
StringBuffer sb = new StringBuffer();
Iterator<String> i = specs.iterator();
// first append any existing value
if(!existingValue.equals("")) {
sb.append(existingValue);
if (i.hasNext()) {
sb.append(", ");
}
}
while (i.hasNext()) {
sb.append(i.next());
if (i.hasNext()) {
sb.append(", ");
}
}
properties.setProperty(property, sb.toString());
}
public Properties getShipCacheProperties() {
return properties;
}
/**
* Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
* @return the {@link StreamingCommand} for this <code>StreamSpec</code>
*/
public StreamingCommand getCommand() {
return command;
}
@Override
public Result getNextTuple() throws ExecException {
// The POStream Operator works with ExecutableManager to
// send input to the streaming binary and to get output
// from it. To achieve a tuple oriented behavior, two queues
// are used - one for output from the binary and one for
// input to the binary. In each getNext() call:
// 1) If there is no more output expected from the binary, an EOP is
// sent to successor
// 2) If there is any output from the binary in the queue, it is passed
// down to the successor
// 3) if neither of these two are true and if it is possible to
// send input to the binary, then the next tuple from the
// predecessor is got and passed to the binary
try {
// if we are being called AFTER all output from the streaming
// binary has already been sent to us then just return EOP
// The "allOutputFromBinaryProcessed" flag is set when we see
// an EOS (End of Stream output) from streaming binary
if(allOutputFromBinaryProcessed) {
return RESULT_EOP;
}
// if we are here AFTER all map() calls have been completed
// AND AFTER we process all possible input to be sent to the
// streaming binary, then all we want to do is read output from
// the streaming binary
if(allInputFromPredecessorConsumed) {
Result r = binaryOutputQueue.take();
if(r.returnStatus == POStatus.STATUS_EOS) {
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
} else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
}
return(r);
}
// if we are here, we haven't consumed all input to be sent
// to the streaming binary - check if we are being called
// from close() on the map or reduce
Result r = getNextHelper((Tuple)null);
if(isFetchable || this.parentPlan.endOfAllInput) {
if(r.returnStatus == POStatus.STATUS_EOP) {
// we have now seen *ALL* possible input
// check if we ever had any real input
// in the course of the map/reduce - if we did
// then "initialized" will be true. If not, just
// send EOP down.
if(getInitialized()) {
// signal End of ALL input to the Executable Manager's
// Input handler thread
binaryInputQueue.put(r);
// note this state for future calls
allInputFromPredecessorConsumed = true;
// look for output from binary
r = binaryOutputQueue.take();
if(r.returnStatus == POStatus.STATUS_EOS) {
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
}
}
} else if(r.returnStatus == POStatus.STATUS_EOS) {
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
} else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
}
return r;
} else {
// we are not being called from close() - so
// we must be called from either map() or reduce()
// get the next Result from helper
if(r.returnStatus == POStatus.STATUS_EOS) {
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
} else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
}
return r;
}
} catch(Exception e) {
int errCode = 2083;
String msg = "Error while trying to get next result in POStream.";
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
public synchronized boolean getInitialized() {
return initialized;
}
public synchronized void setInitialized(boolean initialized) {
this.initialized = initialized;
}
public Result getNextHelper(Tuple t) throws ExecException {
try {
synchronized(this) {
while(true) {
// if there is something in binary output Queue
// return it
if(!binaryOutputQueue.isEmpty()) {
Result res = binaryOutputQueue.take();
return res;
}
// check if we can write tuples to
// input of the process
if(binaryInputQueue.remainingCapacity() > 0) {
Result input = processInput();
if(input.returnStatus == POStatus.STATUS_EOP ||
input.returnStatus == POStatus.STATUS_ERR) {
return input;
} else {
// we have a tuple to send as input
// Only when we see the first tuple which can
// be sent as input to the binary we want
// to initialize the ExecutableManager and set
// up the streaming binary - this is required in
// Unions due to a JOIN where there may never be
// any input to send to the binary in one of the map
// tasks - so we initialize only if we have to.
// initialize the ExecutableManager once
if(!initialized) {
// set up the executableManager
executableManager =
(ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr);
try {
executableManager.configure(this);
executableManager.run();
} catch (IOException ioe) {
int errCode = 2084;
String msg = "Error while running streaming binary.";
throw new ExecException(msg, errCode, PigException.BUG, ioe);
}
initialized = true;
}
// send this input to the streaming
// process
binaryInputQueue.put(input);
}
} else {
// wait for either input to be available
// or output to be consumed
while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty()) {
wait();
}
}
}
}
} catch (Exception e) {
int errCode = 2083;
String msg = "Error while trying to get next result in POStream.";
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
@Override
public String toString() {
return getAliasString() + "POStream" + "[" + command.toString() + "]"
+ " - " + mKey.toString();
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitStream(this);
}
@Override
public String name() {
return toString();
}
@Override
public boolean supportsMultipleInputs() {
return false;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
/**
*
*/
public void finish() throws IOException {
executableManager.close();
}
/**
* @return the Queue which has input to binary
*/
public BlockingQueue<Result> getBinaryInputQueue() {
return binaryInputQueue;
}
/**
* @return the Queue which has output from binary
*/
public BlockingQueue<Result> getBinaryOutputQueue() {
return binaryOutputQueue;
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
ExampleTuple tIn = (ExampleTuple) in;
illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
illustrator.addData((Tuple) out);
}
return (Tuple) out;
}
/**
* @return true if streaming is done through fetching
*/
public boolean isFetchable() {
return isFetchable;
}
/**
* @param isFetchable - whether fetching is applied on POStream
*/
public void setFetchable(boolean isFetchable) {
this.isFetchable = isFetchable;
}
@Override
public PhysicalOperator clone() throws CloneNotSupportedException {
POStream clone = (POStream)super.clone();
clone.binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
clone.binaryInputQueue = new ArrayBlockingQueue<Result>(1);
//Not cloning StreamingCommand as it is read only
return clone;
}
}