blob: 6370e32183a7833c6fd5c9a0586f9fd8d16eb2cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.python.base;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
/**
* Defines the methods that needs to be implemented by the Python Engine implementations. The first implementation
* takes the approach of in-memory interpreter using JEP. Other possibilities are using Py4J which is an inter process
* communication model. An Apex operator would use an instance of the Python engine implementations to run
* python code using the chosen engine.
*/
public interface ApexPythonEngine
{
/***
* Used to perform any pre interpreter processing.
* @param preInitConfigs The configuration that is going to be used by the interpreter
* @throws ApexPythonInterpreterException if there is an issue in executing the pre interpreter logic
*/
void preInitInterpreter(Map<PythonInterpreterConfig,Object> preInitConfigs) throws ApexPythonInterpreterException;
/***
* Starts the interpreter.
* @throws ApexPythonInterpreterException if library not locatable or any other issue starting the interpreter
*/
void startInterpreter() throws ApexPythonInterpreterException;
/***
* Used to perform any logic that needs to be executed after the interpreter is started but before any tuples start
* getting processed. Example, setting the starting state of the variables that are used in tuple processing.
* @throws ApexPythonInterpreterException
*/
void postStartInterpreter() throws ApexPythonInterpreterException;
/***
* Runs a series of commands. The implementation engine could make use of a worker pool to execute the command.
* @param executionMode Whether these commands need to be run on all worker thread or any of the worker thread.Please see
* * {@link WorkerExecutionMode} for choices available
* @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
* we want to implement a sticky worker in the near future. This will allow for a basic approach to
* route the command/s to the same worker if the application logic needs it to be. In the case of ANY
* worker logic, the window ID along with the Request ID is used to implement a round robin approach
* to select the next worker. Note that Sticky worker might be required since python interpreter
* state is accumulated over as the commands run and a command can reference a variable created in a
* previous command etc. Such references might want to route all commands to a specific interpreter
* instance. If the Apex python engine is not being used by an operator implementation directly,
* the caller can pass in any number as it is not used in anything more than selecting a worker from a
* worker pool.
* @param requestId The parameter is used to select a worker from the
* worker pool along with the window Id. If the Apex python engine is not being used by an
* operator implementation directly, the caller can pass in any number as it is not used in anything
* more than selecting a worker from a worker pool.
* @param request Represents the request to be processed.
* @return A map with key as the command run and boolean as the value. True represents that the command successfully
* run.
* @throws ApexPythonInterpreterException if interrupted or if the command cannot be executed
*/
Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode, long windowId, long requestId,
PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
/***
* Executes a method call
* @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
* * {@link WorkerExecutionMode} for choices available
* @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
* we want to implement a sticky worker in the near future. This will allow for a basic approach to
* route the command/s to the same worker if the application logic needs it to be. In the case of ANY
* worker logic, the window ID along with the Request ID is used to implement a round robin approach
* to select the next worker. Note that Sticky worker might be required since python interpreter
* state is accumulated over as the commands run and a command can reference a variable created in a
* previous command etc. Such references might want to route all commands to a specific interpreter
* instance. If the Apex python engine is not being used by an operator implementation directly,
* the caller can pass in any number as it is not used in anything more than selecting a worker from a
* worker pool.
* @param requestId The parameter is used to select a worker from the
* worker pool along with the window Id. If the Apex python engine is not being used by an
* operator implementation directly, the caller can pass in any number as it is not used in anything
* more than selecting a worker from a worker pool.
* @param req Represents the request to be processed.
* @param <T>
* @return A map containing the worker ID as key and boolean as successful or not
* @throws ApexPythonInterpreterException
*/
<T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException;
/***
* Executes a script that is locatable via a file path
* @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
* {@link WorkerExecutionMode} for choices available
* @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
* we want to implement a sticky worker in the near future. This will allow for a basic approach to
* route the command/s to the same worker if the application logic needs it to be. In the case of ANY
* worker logic, the window ID along with the Request ID is used to implement a round robin approach
* to select the next worker. Note that Sticky worker might be required since python interpreter
* state is accumulated over as the commands run and a command can reference a variable created in a
* previous command etc. Such references might want to route all commands to a specific interpreter
* instance. If the Apex python engine is not being used by an operator implementation directly,
* the caller can pass in any number as it is not used in anything more than selecting a worker from a
* worker pool.
* @param requestId The parameter is used to select a worker from the
* worker pool along with the window Id. If the Apex python engine is not being used by an
* operator implementation directly, the caller can pass in any number as it is not used in anything
* more than selecting a worker from a worker pool.
* @param request Represents the request to be processed.
* @return A map containing the worker ID as key and boolean as successful or not
* @throws ApexPythonInterpreterException
*/
Map<String,PythonRequestResponse<Void>> executeScript(WorkerExecutionMode executionMode,long windowId,long requestId,
PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
/***
* Evaluates a string as a python expression and also supports passing in variables from JVM to the python interpreter
* @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
* * {@link WorkerExecutionMode} for choices available
* @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
* we want to implement a sticky worker in the near future. This will allow for a basic approach to
* route the command/s to the same worker if the application logic needs it to be. In the case of ANY
* worker logic, the window ID along with the Request ID is used to implement a round robin approach
* to select the next worker. Note that Sticky worker might be required since python interpreter
* state is accumulated over as the commands run and a command can reference a variable created in a
* previous command etc. Such references might want to route all commands to a specific interpreter
* instance. If the Apex python engine is not being used by an operator implementation directly,
* the caller can pass in any number as it is not used in anything more than selecting a worker from a
* worker pool.
* @param requestId The parameter is used to select a worker from the
* worker pool along with the window Id. If the Apex python engine is not being used by an
* operator implementation directly, the caller can pass in any number as it is not used in anything
* more than selecting a worker from a worker pool.
* @param req Represents the request to be processed.
* @param <T> Java templating signature
* @return A map containing the worker ID as key and boolean as successful or not
* @throws ApexPythonInterpreterException
*/
<T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode, long windowId, long requestId,
PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException;
/***
* @return The queue that holds all of the straggler responses.
*/
BlockingQueue<PythonRequestResponse> getDelayedResponseQueue();
void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue);
/***
* @return The number of times the engine could not process a request as there were no free worker threads and hence
* had to return null
*/
long getNumStarvedReturns();
void setNumStarvedReturns(long numStarvedReturns);
/**
* Returns all of the commands that were executed on all of the worker nodes.
* @return History of all commands executed in sequence
*/
List<PythonRequestResponse> getCommandHistory();
void setCommandHistory(List<PythonRequestResponse> commandHistory);
void stopInterpreter() throws ApexPythonInterpreterException;
}