Installing action requirements inside the driver and shipping them to the executors
diff --git a/sdk_python/amaterasu/pyspark/runtime.py b/sdk_python/amaterasu/pyspark/runtime.py
index da69205..a8fbe3c 100644
--- a/sdk_python/amaterasu/pyspark/runtime.py
+++ b/sdk_python/amaterasu/pyspark/runtime.py
@@ -18,9 +18,11 @@
from pyspark.sql import SparkSession, DataFrame
from amaterasu.base import BaseAmaContextBuilder, LoaderAmaContext
from .datasets import DatasetManager
+from pip._internal import main as pip_main
import os
import sys
+
# For some reason, the leader passes "_" as the PYSPARK_PYTHON env variable
os.environ['PYSPARK_PYTHON'] = os.environ.get('_') or os.environ.get('PYSPARK_PYTHON') or sys.executable
@@ -34,6 +36,7 @@
except:
self.spark_conf = SparkConf()
+
def setMaster(self, master_uri) -> "AmaContextBuilder":
self.spark_conf.setMaster(master_uri)
return self
@@ -42,9 +45,17 @@
self.spark_conf.set(key, value)
return self
+ def prepare_user_dependencies(self):
+ pip_main(["download", "-r", "requirements.txt", '-d', 'job_deps'])
+ return [os.path.join('job_deps', fname) for fname in os.listdir('job_deps')]
+
+
def build(self) -> "AmaContext":
+ deps_paths = self.prepare_user_dependencies()
spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate()
- sc = spark.sparkContext
+ sc: SparkContext = spark.sparkContext
+ for path in deps_paths:
+ sc.addPyFile(path)
return AmaContext(self.ama_conf, sc, spark)