blob: 0a28cf22ba203655bd0e653e104bd6493c405803 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.python;
import org.apache.paimon.utils.Preconditions;
import py4j.CallbackClient;
import py4j.Gateway;
import py4j.GatewayServer;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/** The util class help to prepare Python env and run the python process. */
final class PythonEnvUtils {
static final long CHECK_INTERVAL = 100;
static final long TIMEOUT_MILLIS = 10000;
/**
* Creates a GatewayServer run in a daemon thread.
*
* @return The created GatewayServer
*/
static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
CompletableFuture<GatewayServer> gatewayServerFuture = new CompletableFuture<>();
Thread thread =
new Thread(
() -> {
try (NetUtils.Port port = NetUtils.getAvailablePort()) {
int freePort = port.getPort();
GatewayServer server =
new GatewayServer.GatewayServerBuilder()
.gateway(
new Gateway(
new ConcurrentHashMap<
String, Object>(),
new CallbackClient(freePort)))
.javaPort(0)
.build();
resetCallbackClientExecutorService(server);
gatewayServerFuture.complete(server);
server.start(true);
} catch (Throwable e) {
gatewayServerFuture.completeExceptionally(e);
}
});
thread.setName("py4j-gateway");
thread.setDaemon(true);
thread.start();
thread.join();
return gatewayServerFuture.get();
}
/**
* Reset a daemon thread to the callback client thread pool so that the callback server can be
* terminated when gate way server is shutting down. We need to shut down the none-daemon thread
* firstly, then set a new thread created in a daemon thread to the ExecutorService.
*
* @param gatewayServer the gateway which creates the callback server.
*/
private static void resetCallbackClientExecutorService(GatewayServer gatewayServer)
throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException,
InvocationTargetException {
CallbackClient callbackClient = (CallbackClient) gatewayServer.getCallbackClient();
// The Java API of py4j does not provide approach to set "daemonize_connections" parameter.
// Use reflect to daemonize the connection thread.
Field executor = CallbackClient.class.getDeclaredField("executor");
executor.setAccessible(true);
((ScheduledExecutorService) executor.get(callbackClient)).shutdown();
executor.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new));
Method setupCleaner = CallbackClient.class.getDeclaredMethod("setupCleaner");
setupCleaner.setAccessible(true);
setupCleaner.invoke(callbackClient);
}
/**
* Reset the callback client of gatewayServer with the given callbackListeningAddress and
* callbackListeningPort after the callback server started.
*
* @param callbackServerListeningAddress the listening address of the callback server.
* @param callbackServerListeningPort the listening port of the callback server.
*/
public static void resetCallbackClient(
GatewayServer gatewayServer,
String callbackServerListeningAddress,
int callbackServerListeningPort)
throws UnknownHostException, InvocationTargetException, NoSuchMethodException,
IllegalAccessException, NoSuchFieldException {
gatewayServer.resetCallbackClient(
InetAddress.getByName(callbackServerListeningAddress), callbackServerListeningPort);
resetCallbackClientExecutorService(gatewayServer);
}
/**
* Py4J both supports Java to Python RPC and Python to Java RPC. The GatewayServer object is the
* entry point of Java to Python RPC. Since the Py4j Python client will only be launched only
* once, the GatewayServer object needs to be reused.
*/
private static GatewayServer gatewayServer = null;
static void setGatewayServer(GatewayServer gatewayServer) {
Preconditions.checkArgument(gatewayServer == null || PythonEnvUtils.gatewayServer == null);
PythonEnvUtils.gatewayServer = gatewayServer;
}
}