blob: b463b36ebb1fb506e949bc2e4f05625d679785bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api.python.executor;
import com.google.protobuf.ByteString;
import org.apache.wayang.api.python.function.PythonUdf;
import org.apache.wayang.core.api.exception.WayangException;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
public class ProcessFeeder<Input, Output> {
private Socket socket;
private PythonUdf<Input, Output> udf;
private ByteString serializedUDF;
private Iterable<Input> input;
//TODO add to a config file
int END_OF_DATA_SECTION = -1;
int NULL = -5;
public ProcessFeeder(
Socket socket,
PythonUdf<Input, Output> udf,
ByteString serializedUDF,
Iterable<Input> input){
if(input == null) throw new WayangException("Nothing to process with Python API");
this.socket = socket;
this.udf = udf;
this.serializedUDF = serializedUDF;
this.input = input;
}
public void send(){
try{
//TODO use config buffer size
int BUFFER_SIZE = 8192;
BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
DataOutputStream dataOut = new DataOutputStream(stream);
writeUDF(serializedUDF, dataOut);
this.writeIteratorToStream(input.iterator(), dataOut);
dataOut.writeInt(END_OF_DATA_SECTION);
dataOut.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeUDF(ByteString serializedUDF, DataOutputStream dataOut){
//write(serializedUDF.toByteArray(), dataOut);
writeBytes(serializedUDF.toByteArray(), dataOut);
System.out.println("UDF written");
}
public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut){
System.out.println("iterator being send");
for (Iterator<Input> it = iter; it.hasNext(); ) {
Input elem = it.next();
//System.out.println(elem.toString());
write(elem, dataOut);
}
}
/*TODO Missing case PortableDataStream */
public void write(Object obj, DataOutputStream dataOut){
try {
if(obj == null)
dataOut.writeInt(this.NULL);
/**
* Byte Array cases
*/
else if (obj instanceof Byte[] || obj instanceof byte[]) {
System.out.println("Writing Bytes");
writeBytes(obj, dataOut);
}
/**
* String case
* */
else if (obj instanceof String)
writeUTF((String) obj, dataOut);
/**
* Key, Value case
* */
else if (obj instanceof Map.Entry)
writeKeyValue((Map.Entry) obj, dataOut);
else{
throw new WayangException("Unexpected element type " + obj.getClass());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeBytes(Object obj, DataOutputStream dataOut){
try{
if (obj instanceof Byte[]) {
int length = ((Byte[]) obj).length;
byte[] bytes = new byte[length];
int j=0;
// Unboxing Byte values. (Byte[] to byte[])
for(Byte b: ((Byte[]) obj))
bytes[j++] = b.byteValue();
dataOut.writeInt(length);
dataOut.write(bytes);
} else if (obj instanceof byte[]) {
dataOut.writeInt(((byte[]) obj).length);
dataOut.write(((byte[]) obj));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeUTF(String str, DataOutputStream dataOut){
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
try {
dataOut.writeInt(bytes.length);
dataOut.write(bytes);
} catch (SocketException e){
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
write(obj.getKey(), dataOut);
write(obj.getValue(), dataOut);
}
}