blob: b57504290a2137c65463d6eb86342a4cd5df9bdb [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.apex.malhar.python.base.jep;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.python.base.ApexPythonEngine;
import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
import org.apache.apex.malhar.python.base.WorkerExecutionMode;
import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import static;
* <p>Implements the {@link ApexPythonEngine} interface by using the JEP ( Java Embedded Python ) engine. It is an
* in-memory interpreter and has the following characteristics:
* <ol>
* <li>The python engine allows for 4 major API patterns
* <ul>
* <li>Execute a method call by accepting parameters to pass to the interpreter</li>
* <li>Execute a python script as given in a file path</li>
* <li>Evaluate an expression and allows for passing of variables between the java code and the python
* in-memory interpreter bridge</li>
* <li>A handy method wherein a series of instructions can be passed in one single java call ( executed as a
* sequence of python eval instructions under the hood ) </li>
* </ul>
* <li>Automatic garbage collection of the variables that are passed from java code to the in memory python
* interpreter</li>
* <li>Support for all major python libraries. Tensorflow, Keras, Scikit, XGBoost</li>
* <li>The python engine uses the concept of a worker thread that is responsible for executing any of the 4
* patterns mentioned above. The worker thread is implemented by {@link InterpreterThread}</li>
* <li>The implementation allows for SLA based execution model. i.e. the caller can stipulate that if the call is not
* complete within a time out, the engine code returns back null. See {@link InterpreterWrapper}</li>
* <li>Supports the concept of stragglers i.e. the processing of a request can complete eventually and the
* result available from a queue called as the delayed response queue</li>
* <li>Supports the concept of executing a call on all of the worker threads if required. This is to ensure the
* following use cases:
* <ul>
* <li>Since this is an interpreter, the users can make use of an earlier calls variable definition if
* need be. In such cases, the caller will have the need for a sticky thread i.e. all such calls need to
* end up on the same thread.</li>
* <li>Another reason is to implement the concept of Dynamic partitioning. Since interpreter accumulates
* state due to commands run on it, if a new partition is introduced at runtime, this can failures for all
* subsequent commands as they might depend on variables created in previous windows</li>
* </ul>
* </li>
* </li>
* </ol>
* </p>
* <p> Note that the engine implementation can be used independent of an Operator i.e. as a utility stack if need be.
* Some of the API signatures need a window ID and request ID but they do not necessarily mean that the API
* signatures are bound to an operator lifecycle. These parameters are used for efficient thread usage only and
* the API only needs a monotonically increasing number in true sense.
* </p>
* <p>
* JEP needs to be installed on all of the YARN nodes prior to the use of the JEP engine until docker support is
* available for Apex. Virtual environments are not supported yet. If multiple versions of python are present
* on the YARN nodes, ensure the JVM option java.library.path is pointing to the right version of JEP which in
* turn will ensure the right version of python to be used at runtime.
* </p>
* <p>
* JEPPythonEngine works on the concept of a worker pool. The engine maintains a configurable number of workers and
* each worker has a dedicated request and response queue. While this class is responsible for choosing the
* right worker from the pool of workers for a given request , the {@link InterpreterWrapper} class is responsible
* for maintaining the time bound SLAs.
* </p>
public class JepPythonEngine implements ApexPythonEngine
private static final Logger LOG = LoggerFactory.getLogger(JepPythonEngine.class);
/* Size of the worker pool */
private int numWorkerThreads = 3;
/* A name that can be used while logging messages and also used to set thread names */
private String threadGroupName;
private static final String JEP_LIBRARY_NAME = "jep";
private transient List<PythonRequestResponse> commandHistory = new ArrayList<>();
/* Spin policy for the disruptor queue implementation */
private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
// Represents the number of responses that can be held in the queue
private int bufferCapacity = 64;
/* Time used to sleep in the beginning of the interpreter threads run i.e. start while initializing the interpreter.
Note that booting of the memory interpreter can be a really heavy process depending on the libraries that
are being loaded and hence this variable */
private long sleepTimeAfterInterpreterStart = 3000; // 3 secs
* Represents the queue into which all the stragglers are drained into
private transient BlockingQueue<PythonRequestResponse> delayedResponseQueue =
new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
private List<InterpreterWrapper> workers = new ArrayList<>();
private Map<PythonInterpreterConfig, Object> preInitConfigs;
private long numStarvedReturns = 0;
* Created the JEP Python engine instance but does not start the interpreters yet
* @param threadGroupName A name that represents all the workers in this thread
* @param numWorkerThreads Number of workers in the work pool
public JepPythonEngine(String threadGroupName,int numWorkerThreads)
this.numWorkerThreads = numWorkerThreads;
this.threadGroupName = threadGroupName;
private void initWorkers() throws ApexPythonInterpreterException
{"Attempting to load the JEP dynamic library");
System.loadLibrary(JEP_LIBRARY_NAME);"Successfully loaded the JEP dynamic library in memory");
SpinPolicy spinPolicyForReqQueue = SpinPolicy.WAITING;
if (preInitConfigs.containsKey(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY)) {
spinPolicyForReqQueue = (SpinPolicy)preInitConfigs.get(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY);
for ( int i = 0; i < numWorkerThreads; i++) {
InterpreterWrapper aWorker = new InterpreterWrapper(threadGroupName + "-" + i,delayedResponseQueue,
* Used to select the right worker from the work pool. The goal is to round robin the workers as far as possible.
* Factors like busy workers can mean that the next available worker is chosen
* @param requestId Used to round robin the requests. Need not necessarily mean only an operator can use this engine.
* @return A worker from the worker pool. Null if all workers are busy.
protected InterpreterWrapper selectWorkerForCurrentCall(long requestId)
int slotToLookFor = Ints.saturatedCast(requestId) % numWorkerThreads;
LOG.debug("Slot that is being looked for in the worker pool " + slotToLookFor);
boolean isWorkerFound = false;
int numWorkersScannedForAvailability = 0;
InterpreterWrapper aWorker = null;
while ( (!isWorkerFound) && (numWorkersScannedForAvailability < numWorkerThreads)) {
aWorker = workers.get(slotToLookFor);
numWorkersScannedForAvailability = numWorkersScannedForAvailability + 1;
if (!aWorker.isCurrentlyBusy()) {
isWorkerFound = true;
LOG.debug("Found worker with index as " + slotToLookFor);
} else {
LOG.debug("Thread ID is currently busy " + aWorker.getInterpreterId());
slotToLookFor = slotToLookFor + 1;
if ( slotToLookFor == numWorkerThreads) {
slotToLookFor = 0;
if (isWorkerFound) {
return aWorker;
} else {
numStarvedReturns += 1;
return null;
* See {@link ApexPythonEngine#preInitInterpreter(Map)} for more details
* @param preInitConfigs The configuration that is going to be used by the interpreter.See constants
* defined in {@link PythonInterpreterConfig} for a list of keys available
* @throws ApexPythonInterpreterException if an issue while executing the pre interpreter logic
public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
throws ApexPythonInterpreterException
this.preInitConfigs = preInitConfigs;
* Starts all of the worker threads. Also sleeps for a few moments to ensure "fat" frameworks like Tensorflow can
* be allowed to boot completely.
* @throws ApexPythonInterpreterException
public void startInterpreter() throws ApexPythonInterpreterException
try {
if (sleepTimeAfterInterpreterStart > 0) {
LOG.debug("Sleeping to let the interpreter boot up in memory");
} catch (InterruptedException e) {
throw new RuntimeException(e);
* Used to execute all of the commands from the command history when an operator is instantiating a new instance of
* the engine. Used by the dynamic partitioner to let a newly provisioned operator to catch up to the state of all of
* the remaining operator instances
* @throws ApexPythonInterpreterException
public void postStartInterpreter() throws ApexPythonInterpreterException
for ( InterpreterWrapper wrapper : workers) {
for (PythonRequestResponse requestResponse : commandHistory) {
PythonInterpreterRequest requestPayload = requestResponse.getPythonInterpreterRequest();
try {
} catch (InterruptedException e) {
throw new ApexPythonInterpreterException(e);
* See {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
* details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
* set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
* number of worker threads
* @param executionMode Whether these commands need to be run on all worker nodes or any of the worker node
* @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
* @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
* @param request Represents the request to be processed.
* @return A map containing the command as key and boolean representing success or failure as the value.
* @throws ApexPythonInterpreterException
public Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode,long windowId,
long requestId, PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException
checkNotNull(request.getGenericCommandsRequestPayload(), "Run commands payload not set");
"Commands that need to be run not set");
Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
PythonRequestResponse lastSuccessfullySubmittedRequest = null;
try {
if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
LOG.debug("Executing run commands on all of the interpreter worker threads");
long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(),request.getTimeUnit()) /
LOG.debug("Allocating " + timeOutPerWorker + " nanoseconds for each of the worker threads");
if ( timeOutPerWorker == 0) {
timeOutPerWorker = 1;
for ( InterpreterWrapper wrapper : workers) {
lastSuccessfullySubmittedRequest = wrapper.runCommands(windowId,requestId,request);
if (lastSuccessfullySubmittedRequest != null) {
returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
if ( returnStatus.size() > 0) {
} else {
InterpreterWrapper currentThread = null;
if (executionMode.equals(WorkerExecutionMode.ANY)) {
LOG.debug("Executing run commands on a single interpreter worker thread");
currentThread = selectWorkerForCurrentCall(requestId);
if (executionMode.equals(WorkerExecutionMode.STICKY)) {
currentThread = workers.get(request.hashCode() % numWorkerThreads);
LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
if (currentThread != null) {
lastSuccessfullySubmittedRequest = currentThread.runCommands(windowId, requestId, request);
if (lastSuccessfullySubmittedRequest != null) {
returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
} else {
throw new ApexPythonInterpreterException("No free interpreter threads available." +
" Consider increasing workers and relaunch");
} catch (InterruptedException e) {
throw new ApexPythonInterpreterException(e);
return returnStatus;
* See {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
* details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
* set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
* number of worker threads
* @param executionMode If the method call needs to be invoked on all workers or any single worker
* @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
* @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
* @param req Represents the request to be processed.
* @param <T>
* @return
* @throws ApexPythonInterpreterException
public <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException
checkNotNull(req.getMethodCallRequest(), "Method call info not set");
checkNotNull(req.getMethodCallRequest().getNameOfMethod(), "Method name not set");
Map<String,PythonRequestResponse<T>> returnStatus = new HashMap<>();
PythonRequestResponse lastSuccessfullySubmittedRequest = null;
try {
if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
if ( timeOutPerWorker == 0) {
timeOutPerWorker = 1;
for ( InterpreterWrapper wrapper : workers) {
lastSuccessfullySubmittedRequest = wrapper.executeMethodCall(windowId,requestId,req);
if ( lastSuccessfullySubmittedRequest != null) {
returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
if ( returnStatus.size() > 0) {
} else {
InterpreterWrapper currentThread = null;
if (executionMode.equals(WorkerExecutionMode.ANY)) {
currentThread = selectWorkerForCurrentCall(requestId);
if (executionMode.equals(WorkerExecutionMode.STICKY)) {
currentThread = workers.get(req.hashCode() % numWorkerThreads);
LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
if (currentThread != null) {
lastSuccessfullySubmittedRequest = currentThread.executeMethodCall(windowId, requestId, req);
if (lastSuccessfullySubmittedRequest != null) {
returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
} else {
throw new ApexPythonInterpreterException("No free interpreter threads available." +
" Consider increasing workers and relaunch");
} catch (InterruptedException e) {
throw new ApexPythonInterpreterException(e);
return returnStatus;
* See {@link ApexPythonEngine#executeScript(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
* details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
* set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
* number of worker threads
* @param executionMode If the method call needs to be invoked on all workers or any single worker
* @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
* @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
* @param req Represents the request to be processed.
* @return
* @throws ApexPythonInterpreterException
public Map<String,PythonRequestResponse<Void>> executeScript(WorkerExecutionMode executionMode,long windowId,
long requestId, PythonInterpreterRequest<Void> req)
throws ApexPythonInterpreterException
checkNotNull(req.getScriptExecutionRequestPayload(), "Script execution info not set");
checkNotNull(req.getScriptExecutionRequestPayload().getScriptName(), "Script name not set");
Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
PythonRequestResponse lastSuccessfullySubmittedRequest = null;
try {
if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
if ( timeOutPerWorker == 0) {
timeOutPerWorker = 1;
for ( InterpreterWrapper wrapper : workers) {
lastSuccessfullySubmittedRequest = wrapper.executeScript(windowId,requestId,req);
if (lastSuccessfullySubmittedRequest != null) {
if ( returnStatus.size() > 0) {
} else {
InterpreterWrapper currentThread = null;
if (executionMode.equals(WorkerExecutionMode.ANY)) {
currentThread = selectWorkerForCurrentCall(requestId);
if (executionMode.equals(WorkerExecutionMode.STICKY)) {
currentThread = workers.get(req.hashCode() % numWorkerThreads);
LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
if (currentThread != null) {
lastSuccessfullySubmittedRequest = currentThread.executeScript(windowId, requestId, req);
if (lastSuccessfullySubmittedRequest != null) {
returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
} else {
throw new ApexPythonInterpreterException("No free interpreter threads available." +
" Consider increasing workers and relaunch");
} catch (InterruptedException e) {
throw new ApexPythonInterpreterException(e);
return returnStatus;
private void checkNotNullConditions(PythonInterpreterRequest request)
checkNotNull(request, "Request object cannnot be null");
checkNotNull(request.getTimeout(), "Time out value not set");
checkNotNull(request.getTimeUnit(), "Time out unit not set");
* See {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
* details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
* set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
* number of worker threads
* @param executionMode If the method call needs to be invoked on all workers or any single worker
* @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
* @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
* @param request
* @param <T>
* @return
* @throws ApexPythonInterpreterException
public <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode,long windowId, long requestId,
PythonInterpreterRequest<T> request) throws ApexPythonInterpreterException
checkNotNull(request.getEvalCommandRequestPayload(), "Eval command info not set");
checkNotNull(request.getEvalCommandRequestPayload().getEvalCommand(),"Eval command not set");
Map<String,PythonRequestResponse<T>> statusOfEval = new HashMap<>();
PythonRequestResponse lastSuccessfullySubmittedRequest = null;
try {
if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(), request.getTimeUnit()) /
if ( timeOutPerWorker == 0) {
timeOutPerWorker = 1;
for ( InterpreterWrapper wrapper : workers) {
lastSuccessfullySubmittedRequest = wrapper.eval(windowId,requestId,request);
if (lastSuccessfullySubmittedRequest != null) {
statusOfEval.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
} else {
InterpreterWrapper currentThread = null;
if (executionMode.equals(WorkerExecutionMode.ANY)) {
currentThread = selectWorkerForCurrentCall(requestId);
if (executionMode.equals(WorkerExecutionMode.STICKY)) {
currentThread = workers.get(request.hashCode() % numWorkerThreads);
LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
if (currentThread != null) {
lastSuccessfullySubmittedRequest = currentThread.eval(windowId, requestId, request);
if (lastSuccessfullySubmittedRequest != null) {
statusOfEval.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
} else {
throw new ApexPythonInterpreterException("No free interpreter threads available." +
" Consider increasing workers and relaunch");
} catch (InterruptedException e) {
throw new ApexPythonInterpreterException(e);
return statusOfEval;
public void stopInterpreter() throws ApexPythonInterpreterException
for ( InterpreterWrapper wrapper : workers) {
public int getNumWorkerThreads()
return numWorkerThreads;
public void setNumWorkerThreads(int numWorkerThreads)
this.numWorkerThreads = numWorkerThreads;
public List<InterpreterWrapper> getWorkers()
return workers;
public void setWorkers(List<InterpreterWrapper> workers)
this.workers = workers;
public List<PythonRequestResponse> getCommandHistory()
return commandHistory;
public void setCommandHistory(List<PythonRequestResponse> commandHistory)
this.commandHistory = commandHistory;
public long getSleepTimeAfterInterpreterStart()
return sleepTimeAfterInterpreterStart;
public void setSleepTimeAfterInterpreterStart(long sleepTimeAfterInterpreterStart)
this.sleepTimeAfterInterpreterStart = sleepTimeAfterInterpreterStart;
public BlockingQueue<PythonRequestResponse> getDelayedResponseQueue()
return delayedResponseQueue;
public void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue)
this.delayedResponseQueue = delayedResponseQueue;
public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
return cpuSpinPolicyForWaitingInBuffer;
public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
public int getBufferCapacity()
return bufferCapacity;
public void setBufferCapacity(int bufferCapacity)
this.bufferCapacity = bufferCapacity;
public long getNumStarvedReturns()
return numStarvedReturns;
public void setNumStarvedReturns(long numStarvedReturns)
this.numStarvedReturns = numStarvedReturns;