blob: 1a4b4a4e0561446c3d0e3db3fa68b02423f27342 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import contextlib
import http.server
import os
import queue
import shutil
import subprocess
import sys
import time
import threading
from pathlib import Path
from io import BytesIO
from selenium import webdriver
class TemplateOverrider(http.server.SimpleHTTPRequestHandler):
def log_request(self, code="-", size="-"):
# don't log successful requests
return
def do_GET(self) -> bytes | None:
if self.path.endswith(PYARROW_WHEEL_PATH.name):
self.send_response(200)
self.send_header("Content-type", "application/x-zip")
self.end_headers()
with PYARROW_WHEEL_PATH.open(mode="rb") as wheel:
self.copyfile(wheel, self.wfile)
if self.path.endswith("/test.html"):
body = b"""
<!doctype html>
<html>
<head>
<script>
window.python_done_callback = undefined;
window.python_logs = [];
function capturelogs(evt) {
if ('results' in evt.data) {
if (window.python_done_callback) {
let callback = window.python_done_callback;
window.python_done_callback = undefined;
callback({result:evt.data.results});
}
}
if ('print' in evt.data) {
evt.data.print.forEach((x)=>{window.python_logs.push(x)});
}
}
window.pyworker = new Worker("worker.js");
window.pyworker.onmessage = capturelogs;
</script>
</head>
<body></body>
</html>
"""
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Content-length", len(body))
self.end_headers()
self.copyfile(BytesIO(body), self.wfile)
elif self.path.endswith("/worker.js"):
body = b"""
importScripts("./pyodide.js");
onmessage = async function (e) {
const data = e.data;
if (!self.pyodide) {
self.pyodide = await loadPyodide();
}
function do_print(arg) {
let databytes = Array.from(arg);
self.postMessage({print:databytes});
return databytes.length;
}
self.pyodide.setStdout({write:do_print,isatty:data.isatty});
self.pyodide.setStderr({write:do_print,isatty:data.isatty});
await self.pyodide.loadPackagesFromImports(data.python);
let results = await self.pyodide.runPythonAsync(data.python);
self.postMessage({results});
}
"""
self.send_response(200)
self.send_header("Content-type", "application/javascript")
self.send_header("Content-length", len(body))
self.end_headers()
self.copyfile(BytesIO(body), self.wfile)
else:
return super().do_GET()
def end_headers(self):
# Enable Cross-Origin Resource Sharing (CORS)
self.send_header("Access-Control-Allow-Origin", "*")
super().end_headers()
def run_server_thread(dist_dir, q):
global _SERVER_ADDRESS
os.chdir(dist_dir)
server = http.server.HTTPServer(("", 0), TemplateOverrider)
q.put(server.server_address)
print(f"Starting server for {dist_dir} at: {server.server_address}")
server.serve_forever()
@contextlib.contextmanager
def launch_server(dist_dir):
q = queue.Queue()
p = threading.Thread(target=run_server_thread, args=[dist_dir, q], daemon=True)
p.start()
address = q.get(timeout=50)
time.sleep(0.1) # wait to make sure server is started
yield address
p.terminate()
class NodeDriver:
import subprocess
def __init__(self, hostname, port):
self.process = subprocess.Popen(
[shutil.which("script"), "-c", shutil.which("node")],
stdin=subprocess.PIPE,
shell=False,
bufsize=0,
)
print(self.process)
time.sleep(0.1) # wait for node to start
self.hostname = hostname
self.port = port
self.last_ret_code = None
def load_pyodide(self, dist_dir):
self.execute_js(
f"""
const {{ loadPyodide }} = require('{dist_dir}/pyodide.js');
let pyodide = await loadPyodide();
"""
)
def clear_logs(self):
pass # we don't handle logs for node
def write_stdin(self, buffer):
# because we use unbuffered IO for
# stdout, stdin.write is also unbuffered
# so might under-run on writes
while len(buffer) > 0 and self.process.poll() is None:
written = self.process.stdin.write(buffer)
if written == len(buffer):
break
elif written == 0:
# full buffer - wait
time.sleep(0.01)
else:
buffer = buffer[written:]
def execute_js(self, code, wait_for_terminate=True):
self.write_stdin((code + "\n").encode("utf-8"))
def load_arrow(self):
self.execute_js(f"await pyodide.loadPackage('{PYARROW_WHEEL_PATH}');")
def execute_python(self, code, wait_for_terminate=True):
js_code = f"""
python = `{code}`;
await pyodide.loadPackagesFromImports(python);
python_output = await pyodide.runPythonAsync(python);
"""
self.last_ret_code = self.execute_js(js_code, wait_for_terminate)
return self.last_ret_code
def wait_for_done(self):
# in node we just let it run above
# then send EOF and join process
self.write_stdin(b"process.exit(python_output)\n")
return self.process.wait()
class BrowserDriver:
def __init__(self, hostname, port, driver):
self.driver = driver
self.driver.get(f"http://{hostname}:{port}/test.html")
self.driver.set_script_timeout(100)
def load_pyodide(self, dist_dir):
pass
def load_arrow(self):
self.execute_python(
f"import pyodide_js as pjs\n"
f"await pjs.loadPackage('{PYARROW_WHEEL_PATH.name}')\n"
)
def execute_python(self, code, wait_for_terminate=True):
if wait_for_terminate:
self.driver.execute_async_script(
f"""
let callback = arguments[arguments.length-1];
python = `{code}`;
window.python_done_callback = callback;
window.pyworker.postMessage(
{{python, isatty: {'true' if sys.stdout.isatty() else 'false'}}});
"""
)
else:
self.driver.execute_script(
f"""
let python = `{code}`;
window.python_done_callback= (x) => {{window.python_script_done=x;}};
window.pyworker.postMessage(
{{python,isatty:{'true' if sys.stdout.isatty() else 'false'}}});
"""
)
def clear_logs(self):
self.driver.execute_script("window.python_logs = [];")
def wait_for_done(self):
while True:
# poll for console.log messages from our webworker
# which are the output of pytest
lines = self.driver.execute_script(
"let temp = window.python_logs;window.python_logs=[];return temp;"
)
if len(lines) > 0:
sys.stdout.buffer.write(bytes(lines))
done = self.driver.execute_script("return window.python_script_done;")
if done is not None:
value = done["result"]
self.driver.execute_script("delete window.python_script_done;")
return value
time.sleep(0.1)
class ChromeDriver(BrowserDriver):
def __init__(self, hostname, port):
from selenium.webdriver.chrome.options import Options
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
super().__init__(hostname, port, webdriver.Chrome(options=options))
class FirefoxDriver(BrowserDriver):
def __init__(self, hostname, port):
from selenium.webdriver.firefox.options import Options
options = Options()
options.add_argument("--headless")
super().__init__(hostname, port, webdriver.Firefox(options=options))
def _load_pyarrow_in_runner(driver, wheel_name):
driver.load_arrow()
driver.execute_python(
"""import sys
import micropip
if "pyarrow" not in sys.modules:
await micropip.install("hypothesis")
import pyodide_js as pjs
await pjs.loadPackage("numpy")
await pjs.loadPackage("pandas")
import pytest
import pandas # import pandas after pyarrow package load for pandas/pyarrow
# functions to work
import pyarrow
""",
wait_for_terminate=True,
)
parser = argparse.ArgumentParser()
parser.add_argument(
"-d",
"--dist-dir",
type=str,
help="Pyodide distribution directory",
default="./pyodide",
)
parser.add_argument("wheel", type=str, help="Wheel to run tests from")
parser.add_argument(
"-t", "--test-submodule", help="Submodule that tests live in", default="test"
)
parser.add_argument(
"-r",
"--runtime",
type=str,
choices=["chrome", "node", "firefox"],
help="Runtime to run tests in",
default="chrome",
)
args = parser.parse_args()
PYARROW_WHEEL_PATH = Path(args.wheel).resolve()
dist_dir = Path(os.getcwd(), args.dist_dir).resolve()
print(f"dist dir={dist_dir}")
with launch_server(dist_dir) as (hostname, port):
if args.runtime == "chrome":
driver = ChromeDriver(hostname, port)
elif args.runtime == "node":
driver = NodeDriver(hostname, port)
elif args.runtime == "firefox":
driver = FirefoxDriver(hostname, port)
print("Load pyodide in browser")
driver.load_pyodide(dist_dir)
print("Load pyarrow in browser")
_load_pyarrow_in_runner(driver, Path(args.wheel).name)
driver.clear_logs()
print("Run pytest in browser")
driver.execute_python(
"""
import pyarrow,pathlib
pyarrow_dir = pathlib.Path(pyarrow.__file__).parent
pytest.main([pyarrow_dir, '-v'])
""",
wait_for_terminate=False,
)
print("Wait for done")
os._exit(driver.wait_for_done())