added support for basic python and pandas, pandas not fully integrated.
diff --git a/sdk_python/amaterasu/__init__.py b/sdk_python/amaterasu/__init__.py
index 293fd6d..3846c8b 100644
--- a/sdk_python/amaterasu/__init__.py
+++ b/sdk_python/amaterasu/__init__.py
@@ -13,41 +13,4 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-"""
-import pkg_resources
-import sys
-
-from .runtime import BaseAmaContext, ImproperlyConfiguredError, LoaderAmaContext, BaseDatasetManager, BaseAmaContextBuilder
-
-
-class PluginProxy:
-
- def __init__(self, entry_point):
- self.entry_point = entry_point
- self.module = None
-
- def __getattr__(self, item):
- module = super(PluginProxy, self).__getattribute__('module')
- if not module:
- entry_point = super(PluginProxy, self).__getattribute__('entry_point')
- module = entry_point.load()
- super(PluginProxy, self).__setattr__('module', module)
- return getattr(module, item)
-
-
-
-plugins = {
- entry_point.name: PluginProxy(entry_point)
- for entry_point
- in pkg_resources.iter_entry_points('amaterasu.plugins')
-}
-
-
-__all__ = ['BaseAmaContext', 'AmaContextBuilder', 'BaseDatasetManager', 'LoaderAmaContext', 'ImproperlyConfiguredError']
-
-thismodule = sys.modules[__name__]
-for plugin_name, plugin_proxy in plugins.items():
- setattr(thismodule, plugin_name, plugin_proxy)
- module_name = 'amaterasu.{}'.format(plugin_name)
- sys.modules[module_name] = plugin_proxy
- __all__.append(plugin_name)
\ No newline at end of file
+"""
\ No newline at end of file
diff --git a/sdk_python/amaterasu/base/__init__.py b/sdk_python/amaterasu/base/__init__.py
new file mode 100644
index 0000000..df5487c
--- /dev/null
+++ b/sdk_python/amaterasu/base/__init__.py
@@ -0,0 +1,2 @@
+from .datasets import BaseDatasetManager, BaseDatasetLoader, DatasetTypes
+from .runtime import BaseAmaContextBuilder, BaseAmaContext, LoaderAmaContext
diff --git a/sdk_python/amaterasu/datasets.py b/sdk_python/amaterasu/base/datasets.py
similarity index 99%
rename from sdk_python/amaterasu/datasets.py
rename to sdk_python/amaterasu/base/datasets.py
index 4324ee6..26d6cfc 100644
--- a/sdk_python/amaterasu/datasets.py
+++ b/sdk_python/amaterasu/base/datasets.py
@@ -14,7 +14,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
-import yaml
import enum
import abc
from typing import Dict, Any, Type
diff --git a/sdk_python/amaterasu/runtime.py b/sdk_python/amaterasu/base/runtime.py
similarity index 97%
rename from sdk_python/amaterasu/runtime.py
rename to sdk_python/amaterasu/base/runtime.py
index cbc85bc..b6861c8 100644
--- a/sdk_python/amaterasu/runtime.py
+++ b/sdk_python/amaterasu/base/runtime.py
@@ -20,8 +20,8 @@
import os
import abc
from munch import Munch, munchify
-from typing import Any, Dict
-from amaterasu.datasets import BaseDatasetManager, DatasetTypes
+from typing import Any
+from amaterasu.base import BaseDatasetManager
logger = logging.root
formatter = logging.Formatter()
@@ -145,6 +145,9 @@
def builder(cls):
pass
+ @property
+ def env(self):
+ return self._ama_conf['env']
class LoaderAmaContext(BaseAmaContext, abc.ABC):
@property
diff --git a/sdk_python/amaterasu/pyspark/py.typed b/sdk_python/amaterasu/pandas/__init__.py
similarity index 100%
copy from sdk_python/amaterasu/pyspark/py.typed
copy to sdk_python/amaterasu/pandas/__init__.py
diff --git a/sdk_python/amaterasu/pandas/datasets.py b/sdk_python/amaterasu/pandas/datasets.py
new file mode 100644
index 0000000..096532a
--- /dev/null
+++ b/sdk_python/amaterasu/pandas/datasets.py
@@ -0,0 +1,134 @@
+"""
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+import os
+
+import pandas as pd
+from pyhive import hive
+from typing import Type, Dict, Any
+
+from amaterasu.base.datasets import BaseDatasetLoader, BaseDatasetManager, DatasetTypes
+
+
+class HiveDatasetLoader(BaseDatasetLoader):
+
+ def load_dataset(self) -> pd.DataFrame:
+ hive_db = hive.connect(host=self.dataset_conf['host'], port=self.dataset_conf.get('port', 10000))
+ with hive_db.cursor() as cursor:
+ cursor.execute("SELECT * FROM {}".format(self.dataset_conf['table']))
+ data = list(cursor.fetchall())
+ return pd.DataFrame.from_records(data)
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool = False, keep_index=False):
+ hive_db = hive.connect(host=self.dataset_conf['host'], port=self.dataset_conf.get('port', 10000))
+ with hive_db.cursor() as cursor:
+ if overwrite:
+ cursor.execute('DROP TABLE {}'.format(self.dataset_conf['table']))
+ dataset.to_records(index=keep_index)
+ dataset.write.saveAsTable(self.dataset_conf['table'])
+
+
+class _FileCSVDatasetLoader(BaseDatasetLoader):
+
+ def load_dataset(self, *args, **kwargs) -> pd.DataFrame:
+ separator = self.dataset_conf.get('separator', ',')
+ return pd.read_csv(self.dataset_conf['uri'], sep=separator)
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool):
+ separator = self.dataset_conf.get('separator', ',')
+ dataset.to_csv(self.dataset_conf['conf'], sep=separator)
+
+
+class _FileJSONDatasetLoader(BaseDatasetLoader):
+
+ def load_dataset(self, *args, **kwargs) -> pd.DataFrame:
+ orient = self.dataset_conf.get('orient')
+ return pd.read_json(self.dataset_conf['uri'], orient=orient)
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool):
+ orient = self.dataset_conf.get('orient')
+ dataset.to_json(self.dataset_conf['uri'], orient=orient)
+
+
+class _FileExcelDatasetLoader(BaseDatasetLoader):
+
+ def load_dataset(self, *args, **kwargs) -> pd.DataFrame:
+ worksheet = self.dataset_conf['worksheet']
+ return pd.read_excel(self.dataset_conf['uri'], sheet_name=worksheet)
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool):
+ worksheet = self.dataset_conf['worksheet']
+ dataset.to_excel(self.dataset_conf['uri'], sheet_name=worksheet)
+
+
+class _FileParquetDatasetLoader(BaseDatasetLoader):
+
+ def load_dataset(self, *args, **kwargs) -> pd.DataFrame:
+ return pd.read_parquet(self.dataset_conf['uri'])
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool):
+ dataset.to_parquet(self.dataset_conf['uri'])
+
+
+class _FilePickleDatasetLoader(BaseDatasetLoader):
+
+ def load_dataset(self, *args, **kwargs) -> pd.DataFrame:
+ return pd.read_pickle(self.dataset_conf['uri'])
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool):
+ dataset.to_pickle(self.dataset_conf['uri'])
+
+
+class FileDatasetLoader(BaseDatasetLoader):
+
+ _registered_loaders : Dict[str, Type[BaseDatasetLoader]] = {
+ 'json': _FileJSONDatasetLoader,
+ 'excel': _FileExcelDatasetLoader,
+ 'csv': _FileCSVDatasetLoader,
+ 'pickle': _FilePickleDatasetLoader,
+ 'parquet': _FileParquetDatasetLoader
+ }
+
+ def __init__(self, dataset_conf: Dict):
+ super().__init__(dataset_conf)
+ try:
+ self._concrete_loader = self._registered_loaders[self.dataset_conf['format']](self.dataset_conf)
+ except KeyError:
+ raise NotImplemented(
+ "File with format '{}' is not supported, please handle it manually".format(self.dataset_conf['format']))
+
+ def load_dataset(self) -> pd.DataFrame:
+ return self._concrete_loader.load_dataset()
+
+ def persist_dataset(self, dataset: pd.DataFrame, overwrite: bool):
+ if not overwrite and os.path.exists(self.dataset_conf['uri']):
+ raise IOError("File with path '{}' already exists.".format(self.dataset_conf['uri']))
+ else:
+ self._concrete_loader.persist_dataset(dataset, overwrite)
+
+
+class DatasetManager(BaseDatasetManager):
+
+ def get_datastore(self, datastore_cls: Type[BaseDatasetLoader], dataset_conf: Dict):
+ datastore = datastore_cls(dataset_conf)
+ return datastore
+
+ def __init__(self, dataset_conf, spark):
+ super(DatasetManager, self).__init__(dataset_conf)
+ self.spark = spark
+ self._registered_datastores[DatasetTypes.Hive.value] = HiveDatasetLoader
+ self._registered_datastores[DatasetTypes.File.value] = FileDatasetLoader
+
diff --git a/sdk_python/amaterasu/pandas/runtime.py b/sdk_python/amaterasu/pandas/runtime.py
new file mode 100644
index 0000000..0863857
--- /dev/null
+++ b/sdk_python/amaterasu/pandas/runtime.py
@@ -0,0 +1,35 @@
+"""
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+from amaterasu.base import BaseDatasetManager
+from amaterasu.base import LoaderAmaContext, BaseAmaContextBuilder
+
+
+class AmaContextBuilder(BaseAmaContextBuilder):
+
+ def build(self):
+ return AmaContext(self.ama_conf)
+
+
+class AmaContext(LoaderAmaContext):
+
+ @property
+ def dataset_manager(self) -> BaseDatasetManager:
+ pass
+
+ @classmethod
+ def builder(cls):
+ return AmaContextBuilder()
diff --git a/sdk_python/amaterasu/pyspark/py.typed b/sdk_python/amaterasu/pyspark/__init__.py
similarity index 100%
rename from sdk_python/amaterasu/pyspark/py.typed
rename to sdk_python/amaterasu/pyspark/__init__.py
diff --git a/sdk_python/amaterasu/pyspark/__init__.pyi b/sdk_python/amaterasu/pyspark/__init__.pyi
deleted file mode 100644
index 1898f03..0000000
--- a/sdk_python/amaterasu/pyspark/__init__.pyi
+++ /dev/null
@@ -1,4 +0,0 @@
-# Stubs for amaterasu_pyspark (Python 3)
-#
-# NOTE: This dynamically typed stub was automatically generated by stubgen.
-
diff --git a/sdk_python/amaterasu/pyspark/datasets.py b/sdk_python/amaterasu/pyspark/datasets.py
new file mode 100644
index 0000000..5532d4e
--- /dev/null
+++ b/sdk_python/amaterasu/pyspark/datasets.py
@@ -0,0 +1,73 @@
+"""
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+from abc import ABC
+from pyspark.sql import SparkSession, DataFrame
+from typing import Type, Dict
+
+from amaterasu.base import BaseDatasetLoader, BaseDatasetManager, DatasetTypes
+
+
+class BaseSparkDatasetLoader(BaseDatasetLoader, ABC):
+
+ def __init__(self, dataset_conf: Dict, spark: SparkSession):
+ super(BaseSparkDatasetLoader, self).__init__(dataset_conf)
+ self.spark = spark
+
+
+class HiveDatasetLoader(BaseSparkDatasetLoader):
+
+ def load_dataset(self) -> DataFrame:
+ return self.spark.sql("SELECT * FROM {}".format(self.dataset_conf['table']))
+
+ def persist_dataset(self, dataset: DataFrame, overwrite: bool = False):
+ if overwrite:
+ self.spark.sql('DROP TABLE {}'.format(self.dataset_conf['table']))
+ dataset.write.saveAsTable(self.dataset_conf['table'])
+
+
+class FileDatasetLoader(BaseSparkDatasetLoader):
+
+ def load_dataset(self) -> DataFrame:
+ return self.spark\
+ .read\
+ .format(self.dataset_conf['format'])\
+ .load(self.dataset_conf['uri'])
+
+ def persist_dataset(self, dataset: DataFrame, overwrite: bool):
+ if overwrite:
+ dataset.write \
+ .mode('overwrite') \
+ .format(self.dataset_conf['format']) \
+ .save(self.dataset_conf['uri'])
+ else:
+ dataset.write\
+ .format(self.dataset_conf['format']) \
+ .save(self.dataset_conf['uri'])
+
+
+class DatasetManager(BaseDatasetManager):
+
+ def get_datastore(self, datastore_cls: Type[BaseSparkDatasetLoader], dataset_conf: Dict):
+ datastore = datastore_cls(dataset_conf, self.spark)
+ return datastore
+
+ def __init__(self, dataset_conf, spark):
+ super(DatasetManager, self).__init__(dataset_conf)
+ self.spark = spark
+ self._registered_datastores[DatasetTypes.Hive.value] = HiveDatasetLoader
+ self._registered_datastores[DatasetTypes.File.value] = FileDatasetLoader
+
diff --git a/sdk_python/amaterasu/pyspark/datasets.pyi b/sdk_python/amaterasu/pyspark/datasets.pyi
deleted file mode 100644
index 9ae4e28..0000000
--- a/sdk_python/amaterasu/pyspark/datasets.pyi
+++ /dev/null
@@ -1,25 +0,0 @@
-# Stubs for amaterasu_pyspark.datasets (Python 3)
-#
-# NOTE: This dynamically typed stub was automatically generated by stubgen.
-
-from abc import ABC
-from amaterasu.datasets import BaseDatasetLoader, BaseDatasetManager
-from pyspark.sql import DataFrame, SparkSession
-from typing import Any, Dict, Type
-
-class BaseSparkDatasetLoader(BaseDatasetLoader, ABC):
- spark: Any = ...
- def __init__(self, dataset_conf: Dict, spark: SparkSession) -> Any: ...
-
-class HiveDatasetLoader(BaseSparkDatasetLoader):
- def load_dataset(self) -> DataFrame: ...
- def persist_dataset(self, dataset: DataFrame, overwrite: bool=...) -> Any: ...
-
-class FileDatasetLoader(BaseSparkDatasetLoader):
- def load_dataset(self) -> DataFrame: ...
- def persist_dataset(self, dataset: DataFrame, overwrite: bool) -> Any: ...
-
-class DatasetManager(BaseDatasetManager):
- def get_datastore(self, datastore_cls: Type[BaseSparkDatasetLoader], dataset_conf: Dict) -> Any: ...
- spark: Any = ...
- def __init__(self, dataset_conf: Any, spark: Any) -> None: ...
diff --git a/sdk_python/amaterasu/pyspark/runtime.py b/sdk_python/amaterasu/pyspark/runtime.py
new file mode 100644
index 0000000..d7ad20c
--- /dev/null
+++ b/sdk_python/amaterasu/pyspark/runtime.py
@@ -0,0 +1,82 @@
+"""
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+"""
+from pyspark import SparkContext, SparkConf
+from pyspark.sql import SparkSession, DataFrame
+from amaterasu.base import BaseAmaContextBuilder, LoaderAmaContext
+from .datasets import DatasetManager
+import os
+import sys
+
+# For some reason, the leader passes "_" as the PYSPARK_PYTHON env variable
+os.environ['PYSPARK_PYTHON'] = os.environ.get('_') or os.environ.get('PYSPARK_PYTHON') or sys.executable
+
+
+class AmaContextBuilder(BaseAmaContextBuilder):
+
+ def __init__(self):
+ super().__init__()
+ if self.ama_conf:
+ self.spark_conf = SparkConf()\
+ .setAppName('amaterasu-{}-{}'.format(self.ama_conf.runtime.jobId, self.ama_conf.runtime.actionName))\
+ .setMaster(self.ama_conf.env.master)
+ else:
+ self.spark_conf = SparkConf()
+
+ def setMaster(self, master_uri) -> "AmaContextBuilder":
+ self.spark_conf.setMaster(master_uri)
+ return self
+
+ def set(self, key, value) -> "AmaContextBuilder":
+ self.spark_conf.set(key, value)
+ return self
+
+ def build(self) -> "AmaContext":
+ spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate()
+ sc = spark.sparkContext
+ return AmaContext(self.ama_conf, sc, spark)
+
+
+class AmaContext(LoaderAmaContext):
+
+ @classmethod
+ def builder(cls) -> AmaContextBuilder:
+ return AmaContextBuilder()
+
+ @property
+ def dataset_manager(self) -> DatasetManager:
+ return self._dataset_manager
+
+ @property
+ def sc(self) -> SparkContext:
+ return self._sc
+
+ @property
+ def spark(self) -> SparkSession:
+ return self._spark
+
+ def __init__(self, ama_conf, sc: SparkContext = None, spark: SparkSession = None):
+ super(AmaContext, self).__init__(ama_conf)
+ self._sc, self._spark = sc, spark
+ self._dataset_manager = DatasetManager(ama_conf.datasets, self.spark)
+
+ def get_dataset(self, dataset_name: str) -> DataFrame:
+ return self._dataset_manager.load_dataset(dataset_name)
+
+ def persist(self, dataset_name: str, dataset: DataFrame, overwrite: bool = True):
+ self._dataset_manager.persist_dataset(dataset_name, dataset, overwrite)
+
+
diff --git a/sdk_python/amaterasu/pyspark/runtime.pyi b/sdk_python/amaterasu/pyspark/runtime.pyi
deleted file mode 100644
index fbf9184..0000000
--- a/sdk_python/amaterasu/pyspark/runtime.pyi
+++ /dev/null
@@ -1,30 +0,0 @@
-# Stubs for amaterasu_pyspark.runtime (Python 3)
-#
-# NOTE: This dynamically typed stub was automatically generated by stubgen.
-
-from .datasets import DatasetManager
-from amaterasu.datasets import BaseDatasetManager
-from amaterasu.runtime import BaseAmaContextBuilder, LoaderAmaContext
-from pyspark import SparkContext
-from pyspark.sql import DataFrame, SparkSession
-from typing import Any
-
-class AmaContextBuilder(BaseAmaContextBuilder):
- spark_conf: Any = ...
- def __init__(self) -> None: ...
- def setMaster(self, master_uri: Any) -> AmaContextBuilder: ...
- def set(self, key: Any, value: Any) -> AmaContextBuilder: ...
- def build(self) -> AmaContext: ...
-
-class AmaContext(LoaderAmaContext):
- @classmethod
- def builder(cls: Any) -> AmaContextBuilder: ...
- @property
- def dataset_manager(self) -> BaseDatasetManager: ...
- @property
- def sc(self) -> SparkContext: ...
- @property
- def spark(self) -> SparkSession: ...
- def __init__(self, ama_conf: Any, sc: SparkContext=..., spark: SparkSession=...) -> Any: ...
- def get_dataset(self, dataset_name: str) -> DataFrame: ...
- def persist(self, dataset_name: str, dataset: DataFrame, overwrite: bool=...) -> Any: ...
diff --git a/sdk_python/setup.py b/sdk_python/setup.py
index 3d59dcb..526ae94 100644
--- a/sdk_python/setup.py
+++ b/sdk_python/setup.py
@@ -32,9 +32,12 @@
description='Apache Amaterasu (incubating) is an open source, configuration managment and deployment framework for big data pipelines',
python_requires='>=3.4.*, <4',
install_requires=['stomp.py', 'pyYaml', 'munch'],
- package_data={
- 'amaterasu.pyspark': ['py.typed']
- },
+ data_files=[
+ (
+ 'shared/typehints/python{}.{}'.format(*sys.version_info[:2]),
+ pathlib.Path('./amaterasu').glob('**/*.pyi'),
+ ),
+ ],
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Console',