blob: dc8440568b68327dbb8693a54cc21c19fa32ae38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.streaming;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.pig.PigStreamingBase;
import org.apache.pig.PigToStream;
import org.apache.pig.data.WritableByteArray;
import org.apache.pig.data.Tuple;
/**
* {@link InputHandler} is responsible for handling the input to the
* Pig-Streaming external command.
*
* The managed executable could be fed input in a {@link InputType#SYNCHRONOUS}
* manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
* manner via an external file which is subsequently read by the executable.
*/
public abstract class InputHandler {
/**
*
*/
public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
/*
* The serializer to be used to send data to the managed process.
*
* It is the responsibility of the concrete sub-classes to setup and
* manage the serializer.
*/
protected PigToStream serializer;
private PigStreamingBase newSerializer;
private OutputStream out;
// flag to mark if close() has already been called
protected boolean alreadyClosed = false;
/**
* Get the handled <code>InputType</code>
* @return the handled <code>InputType</code>
*/
public abstract InputType getInputType();
/**
* Send the given input <code>Tuple</code> to the managed executable.
*
* @param t input <code>Tuple</code>
* @throws IOException
*/
public void putNext(Tuple t) throws IOException {
if (newSerializer != null) {
WritableByteArray buf = newSerializer.serializeToBytes(t);
out.write(buf.getData(), 0, buf.getLength());
} else {
out.write(serializer.serialize(t));
}
}
/**
* Close the <code>InputHandler</code> since there is no more input
* to be sent to the managed process.
* @param process the managed process - this could be null in some cases
* like when input is through files. In that case, the process would not
* have been exec'ed yet - if this method if overridden it is the responsibility
* of the implementer to check that the process is usable. The managed process
* object is supplied by the ExecutableManager to this call so that this method
* can check if the process is alive if it needs to know.
*
* @throws IOException
*/
public synchronized void close(Process process) throws IOException {
if(!alreadyClosed) {
alreadyClosed = true;
out.flush();
out.close();
out = null;
}
}
/**
* Bind the <code>InputHandler</code> to the <code>OutputStream</code>
* from which it reads input and sends it to the managed process.
*
* @param os <code>OutputStream</code> from which to read input data for the
* managed process
* @throws IOException
*/
public void bindTo(OutputStream os) throws IOException {
out = os;
if (this.serializer instanceof PigStreamingBase) {
this.newSerializer = (PigStreamingBase) serializer;
}
}
}