blob: d969b19787e778f5e2a701174c53faacc07412a0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.apex.malhar.python.base.jep;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
* Wraps around the interpreter thread so that time bound SLAs can be implemented for python based execution
* This class primarily implements the time constraints by utilizing the {@link InterpreterThread} class and using
* a Disruptor blocking queue for high throughput. Utilizes an executor service to implement the timing SLAs.
public class InterpreterWrapper
private static final Logger LOG = LoggerFactory.getLogger(InterpreterWrapper.class);
/* Reference to the interpreter thread which executes requests in memory */
private transient InterpreterThread interpreterThread;
/* Spin policy to use for the disruptor implementation */
private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
private int bufferCapacity = 16; // Represents the number of workers and response queue sizes
private String interpreterId;
/* Represents the actual thread instance running under the Executor service */
private transient Future<?> handleToJepRunner;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private transient BlockingQueue<PythonRequestResponse> requestQueue;
private transient BlockingQueue<PythonRequestResponse> responseQueue;
/* Represents the queue into which all of the stragglers will be pushed into by the interpreter thread */
private transient volatile BlockingQueue<PythonRequestResponse> delayedResponsesQueue;
* Constructs the interpreter wrapper instance.
* @param interpreterId A string that can be used to represent the interpreter id that is passed onto the actual
* thread that is executing the commands
* @param delayedResponsesQueueRef The queue into which all of the straggler responses will end in
public InterpreterWrapper(String interpreterId,BlockingQueue<PythonRequestResponse> delayedResponsesQueueRef,
SpinPolicy spinPolicyForWaitingInRequestQueue)
delayedResponsesQueue = delayedResponsesQueueRef;
this.interpreterId = interpreterId;
this.cpuSpinPolicyForWaitingInBuffer = spinPolicyForWaitingInRequestQueue;
requestQueue = new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
responseQueue = new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
interpreterThread = new InterpreterThread(requestQueue,responseQueue,interpreterId);
* Invokes the interpreter thread pre initialization logic
* @param preInitConfigs A set of key value pairs that are used to initialize the actual interpreter. See constants
* defined in {@link InterpreterThread} for a list of keys available
* @throws ApexPythonInterpreterException if the pre-initialization logic could not be executed for whatever reasons
public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs) throws ApexPythonInterpreterException
* Starts the actual interpreter thread to which this class is wrapping around by using an executor service
* @throws ApexPythonInterpreterException
public void startInterpreter() throws ApexPythonInterpreterException
handleToJepRunner = executorService.submit(interpreterThread);
* Builds a response object for the incoming request object.
* @param req Represents the incoming request for which response needs to be generated for.
* @param windowId The Operator window ID ( used to choose the interpreter thread while choosing to execute the
* logic from a pool of worker threads )
* @param requestId The request ID perhaps coming from the base python operator. Only used to optimize the right
* interpreter lookup from a pool of worker interpreter threads.
* @param <T> The template of the return type
* @return An object of type {@link PythonRequestResponse} is returned which encompasses both request and response.
private <T> PythonRequestResponse<T> buildRequestRespObject(PythonInterpreterRequest<T> req,
long windowId,long requestId)
PythonRequestResponse<T> requestResponse = new PythonRequestResponse();
PythonInterpreterResponse<T> response = new PythonInterpreterResponse<>(req.getExpectedReturnType());
return requestResponse;
* Handles the common logic that is common across all methods of invocation of the in-memory interpreter. Some common
* logic includes draining any stragglers, matching the request to the any of the responses that arrive in the
* response queue possibly due to previous requests
* @param requestResponse The wrapper object into which
* @param req The request that contains the timeout SLAs
* @param <T> Java templating signature
* @return A response to the original incoming request, null if the response did not arrive within given SLA.
* @throws InterruptedException if interrupted while waiting for the response queue.
public <T> PythonRequestResponse<T> processRequest(PythonRequestResponse requestResponse,
PythonInterpreterRequest<T> req) throws InterruptedException
List<PythonRequestResponse> drainedResults = new ArrayList<>();
PythonRequestResponse currentRequestWithResponse = null;
boolean isCurrentRequestProcessed = false;
long timeOutInNanos = TimeUnit.NANOSECONDS.convert(req.getTimeout(),req.getTimeUnit());
// drain any previous responses that were returned while the Apex operator is processing
LOG.debug("Draining previous request responses if any " + drainedResults.size());
for (PythonRequestResponse oldRequestResponse : drainedResults) {
// We first set a timer to see how long it actually it took for the response to arrive.
// It is possible that a response arrived due to a previous request and hence this need for the timer
// which tracks the time for the current request.
long currentStart = System.nanoTime();
long timeLeftToCompleteProcessing = timeOutInNanos;
while ( (!isCurrentRequestProcessed) && ( timeLeftToCompleteProcessing > 0 )) {
LOG.debug("Submitting the interpreter Request with time out in nanos as " + timeOutInNanos);
// ensures we are blocked till the time limit
currentRequestWithResponse = responseQueue.poll(timeOutInNanos, TimeUnit.NANOSECONDS);
timeLeftToCompleteProcessing = timeLeftToCompleteProcessing - ( System.nanoTime() - currentStart );
currentStart = System.nanoTime();
if (currentRequestWithResponse != null) {
if ( (requestResponse.getRequestId() == currentRequestWithResponse.getRequestId()) &&
(requestResponse.getWindowId() == currentRequestWithResponse.getWindowId()) ) {
isCurrentRequestProcessed = true;
} else {
} else {
LOG.debug(" Processing of request could not be completed on time");
if (isCurrentRequestProcessed) {
LOG.debug("Response could be processed within time SLA");
return currentRequestWithResponse;
} else {
LOG.debug("Response could not be processed within time SLA");
return null;
* Implements the time based SLA over the interpreters run commands implementation. See
* {@link InterpreterThread#runCommands(List)}
* @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param request The payload of the request
* @return A response object with the results of the execution. Null if the request could not be processed on time.
* @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
* queue
public PythonRequestResponse<Void> runCommands(long windowId, long requestId,
PythonInterpreterRequest<Void> request) throws InterruptedException
PythonRequestResponse requestResponse = buildRequestRespObject(request,windowId,requestId);
return processRequest(requestResponse,request);
* Implements the time based SLA over the interpreters run commands implementation. See
* {@link InterpreterThread#executeMethodCall(String, List, Class)}
* @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param request The payload of the request
* @return A response object with the results of the execution. Null if the request could not be processed on time.
* @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
* queue
public <T> PythonRequestResponse<T> executeMethodCall(long windowId, long requestId,
PythonInterpreterRequest<T> request) throws InterruptedException
PythonRequestResponse requestResponse = buildRequestRespObject(request, windowId,requestId);
return processRequest(requestResponse,request);
* Implements the time based SLA over the interpreters run commands implementation. See
* {@link InterpreterThread#executeScript(String)}
* @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param request The payload of the request
* @return A response object with the results of the execution. Null if the request could not be processed on time.
* @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
* queue
public PythonRequestResponse<Void> executeScript(long windowId,long requestId,PythonInterpreterRequest<Void> request)
throws InterruptedException
PythonRequestResponse<Void> requestResponse = buildRequestRespObject(request, windowId,requestId);
return processRequest(requestResponse,request);
* Implements the time based SLA over the interpreters run commands implementation. See
* {@link InterpreterThread#eval(String, String, Map, boolean, Class)}
* @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
* @param request The payload of the request
* @return A response object with the results of the execution. Null if the request could not be processed on time.
* @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
* queue
public <T> PythonRequestResponse<T> eval(long windowId, long requestId,PythonInterpreterRequest<T> request)
throws InterruptedException
PythonRequestResponse<T> requestResponse = buildRequestRespObject(request,windowId,requestId);
return processRequest(requestResponse,request);
* Stops the interpreter
* @throws ApexPythonInterpreterException if error while stopping the interpreter
public void stopInterpreter() throws ApexPythonInterpreterException
public InterpreterThread getInterpreterThread()
return interpreterThread;
public void setInterpreterThread(InterpreterThread interpreterThread)
this.interpreterThread = interpreterThread;
public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
return cpuSpinPolicyForWaitingInBuffer;
public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
public int getBufferCapacity()
return bufferCapacity;
public void setBufferCapacity(int bufferCapacity)
this.bufferCapacity = bufferCapacity;
public String getInterpreterId()
return interpreterId;
public void setInterpreterId(String interpreterId)
this.interpreterId = interpreterId;
public BlockingQueue<PythonRequestResponse> getRequestQueue()
return requestQueue;
public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
this.requestQueue = requestQueue;
public BlockingQueue<PythonRequestResponse> getResponseQueue()
return responseQueue;
public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
this.responseQueue = responseQueue;
public BlockingQueue<PythonRequestResponse> getDelayedResponsesQueue()
return delayedResponsesQueue;
public void setDelayedResponsesQueue(BlockingQueue<PythonRequestResponse> delayedResponsesQueue)
this.delayedResponsesQueue = delayedResponsesQueue;
public boolean isCurrentlyBusy()
return interpreterThread.isBusy();