builder architecture
diff --git a/frameworks/spark/pyspark_runtime/amaterasu_pyspark/datasets.py b/frameworks/spark/pyspark_runtime/amaterasu_pyspark/datasets.py
index 837a33d..15232bb 100644
--- a/frameworks/spark/pyspark_runtime/amaterasu_pyspark/datasets.py
+++ b/frameworks/spark/pyspark_runtime/amaterasu_pyspark/datasets.py
@@ -65,8 +65,8 @@
         datastore = datastore_cls(dataset_conf, self.spark)
         return datastore
 
-    def __init__(self, spark):
-        super(DatasetManager, self).__init__()
+    def __init__(self, dataset_conf, spark):
+        super(DatasetManager, self).__init__(dataset_conf)
         self.spark = spark
         self._registered_datastores[DatasetTypes.Hive.value] = HiveDatasetLoader
         self._registered_datastores[DatasetTypes.File.value] = FileDatasetLoader
diff --git a/frameworks/spark/pyspark_runtime/amaterasu_pyspark/runtime.py b/frameworks/spark/pyspark_runtime/amaterasu_pyspark/runtime.py
index b271521..0657850 100644
--- a/frameworks/spark/pyspark_runtime/amaterasu_pyspark/runtime.py
+++ b/frameworks/spark/pyspark_runtime/amaterasu_pyspark/runtime.py
@@ -16,11 +16,12 @@
 """
 from typing import Tuple
 
-from amaterasu import conf, notifier, ImproperlyConfiguredError, BaseAmaContext
+from amaterasu import conf, ImproperlyConfiguredError, BaseAmaContext
 from pyspark import SparkContext, SparkConf
 from pyspark.sql import SparkSession, DataFrame
 
 from amaterasu.datasets import BaseDatasetManager
+from amaterasu.runtime import Environment, AmaContextBuilder
 from .datasets import DatasetManager
 
 
@@ -37,16 +38,43 @@
     return sc, spark
 
 
+class SparkAmaContextBuilder(AmaContextBuilder):
+
+    def __init__(self):
+        super().__init__()
+        self.spark_conf = SparkConf()
+
+    def setMaster(self, master_uri):
+        self.spark_conf.setMaster(master_uri)
+
+    def set(self, key, value):
+        self.spark_conf.set(key, value)
+
+    def build(self):
+        spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate()
+        sc = spark.sparkContext
+        return AmaContext(self.env, sc, spark)
+
+
 class AmaContext(BaseAmaContext):
 
+
+
+    @classmethod
+    def builder(cls):
+        return SparkAmaContextBuilder()
+
     @property
     def dataset_manager(self) -> BaseDatasetManager:
         return self._dataset_manager
 
-    def __init__(self, sc: SparkContext = None, spark: SparkSession = None):
-        super(AmaContext, self).__init__()
+    def get_member(self, member_name):
+        return
+
+    def __init__(self, env: Environment, sc: SparkContext = None, spark: SparkSession = None):
+        super(AmaContext, self).__init__(env)
         self.sc, self.spark = _get_or_create_spark_attributes(sc, spark)
-        self._dataset_manager = DatasetManager(self.spark)
+        self._dataset_manager = DatasetManager(env.datasets, self.spark)
 
     def get_dataset(self, dataset_name: str) -> DataFrame:
         return self._dataset_manager.load_dataset(dataset_name)
@@ -55,7 +83,3 @@
         self._dataset_manager.persist_dataset(dataset_name, dataset, overwrite)
 
 
-try:
-    ama_context = AmaContext(sc, spark)  # When using spark-submit
-except NameError:
-    ama_context = AmaContext()
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/tests/runtime_tests.py b/frameworks/spark/pyspark_runtime/tests/runtime_tests.py
index 8407977..001648d 100644
--- a/frameworks/spark/pyspark_runtime/tests/runtime_tests.py
+++ b/frameworks/spark/pyspark_runtime/tests/runtime_tests.py
@@ -18,21 +18,24 @@
 
 from pyspark.sql import DataFrame
 
-from amaterasu_pyspark.runtime import ama_context
-from amaterasu.datastores import DatasetNotFoundError, DatasetTypeNotSupported
+from amaterasu_pyspark.runtime import AmaContext
+from amaterasu.datasets import DatasetNotFoundError, DatasetTypeNotSupported
 
 
 class DatastoresTests(unittest.TestCase):
 
+    def setUp(self):
+        self.ama_context = AmaContext.builder().build()
+
     def test_loading_an_existing_generic_dataset_should_not_be_implemented(self):
-        self.assertRaises(NotImplementedError,  ama_context.get_dataset, "mydataset")
+        self.assertRaises(NotImplementedError,  self.ama_context.get_dataset, "mydataset")
 
     def test_loading_an_unsupported_dataset_should_raise_an_exception(self):
-        self.assertRaises(DatasetTypeNotSupported, ama_context.get_dataset, "unsupported")
+        self.assertRaises(DatasetTypeNotSupported, self.ama_context.get_dataset, "unsupported")
 
     def test_loading_a_dataset_that_is_not_defined_should_raise_an_exception(self):
-        self.assertRaises(DatasetNotFoundError, ama_context.get_dataset, "notfound")
+        self.assertRaises(DatasetNotFoundError, self.ama_context.get_dataset, "notfound")
 
     def test_load_dataset_from_file_should_return_a_dataframe(self):
-        df = ama_context.get_dataset('input_file')
+        df = self.ama_context.get_dataset('input_file')
         self.assertEquals(type(df), DataFrame)
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/tests/test_scripts/simple.py b/frameworks/spark/pyspark_runtime/tests/test_scripts/simple.py
index 8f642e9..258fa15 100644
--- a/frameworks/spark/pyspark_runtime/tests/test_scripts/simple.py
+++ b/frameworks/spark/pyspark_runtime/tests/test_scripts/simple.py
@@ -17,8 +17,9 @@
 from pyspark.sql.functions import udf
 from pyspark.sql.types import StructType, StructField, IntegerType
 
-from amaterasu.pyspark.runtime import ama_context
+from amaterasu.pyspark.runtime import AmaContext
 
+ama_context = AmaContext.builder().build()
 
 def pow(num):
     return num * num
diff --git a/sdk_python/amaterasu/__init__.py b/sdk_python/amaterasu/__init__.py
index 8ee50ab..740d6b3 100644
--- a/sdk_python/amaterasu/__init__.py
+++ b/sdk_python/amaterasu/__init__.py
@@ -17,7 +17,7 @@
 import pkg_resources
 import sys
 
-from .runtime import BaseAmaContext, conf, notifier, ImproperlyConfiguredError, _LazyProxy
+from .runtime import BaseAmaContext, conf, ImproperlyConfiguredError
 
 
 class PluginProxy:
@@ -43,7 +43,7 @@
 }
 
 
-__all__ = ['BaseAmaContext', 'conf', 'notifier', 'ImproperlyConfiguredError']
+__all__ = ['BaseAmaContext', 'conf', 'ImproperlyConfiguredError']
 
 thismodule = sys.modules[__name__]
 for plugin_name, plugin_proxy in plugins.items():
diff --git a/sdk_python/amaterasu/datasets.py b/sdk_python/amaterasu/datasets.py
index 5819f11..1408ef7 100644
--- a/sdk_python/amaterasu/datasets.py
+++ b/sdk_python/amaterasu/datasets.py
@@ -65,10 +65,9 @@
 
     _registered_datastores: Dict[str, Type[BaseDatasetLoader]] = {}
 
-    def __init__(self):
+    def __init__(self, datasets_conf):
         self._registered_datastores[DatasetTypes.Generic.value] = GenericDatasetLoader
-        with open('datasets.yml', 'r', encoding='utf-8') as f:
-            self._datasets_conf = yaml.load(f)
+        self._datasets_conf = datasets_conf
 
     def _find_dataset_config(self, dataset_name: str) -> Dict:
         for dataset_type, dataset_configurations in self._datasets_conf.items():
diff --git a/sdk_python/amaterasu/runtime.py b/sdk_python/amaterasu/runtime.py
index 6adba1f..e1fd8f1 100644
--- a/sdk_python/amaterasu/runtime.py
+++ b/sdk_python/amaterasu/runtime.py
@@ -26,10 +26,22 @@
 from amaterasu.datasets import BaseDatasetManager
 
 
+def _get_local_file_path(file_name):
+    cwd = os.getcwd()
+    return os.path.join(cwd, file_name)
+
 class ImproperlyConfiguredError(Exception):
     pass
 
 
+class Environment(Munch):
+    pass
+
+
+class RuntimeNotSupportedError(Exception):
+    pass
+
+
 class AmaActiveMQNotificationHandler(logging.Handler):
 
     def create_mq(self):
@@ -50,9 +62,80 @@
         self.mq.send(body=record, destination=self.queue_name)
 
 
+class AmaContextBuilder(abc.ABC):
+
+    def __init__(self):
+        self.env_conf_path = _get_local_file_path('env.yml')
+        self.runtime_conf_path = _get_local_file_path('runtime.yml')
+        self.datasets_conf_path = _get_local_file_path('datasets.yml')
+        try:
+            self.env = self._create_env()
+        except:
+            print("Could not load default env parameters!")
+            self.env = None
+        self._frameworks = self._resolve_supported_frameworks()
+
+    def _create_env(self):
+        _dict = {
+            'job_metadata': None,
+            'env': None,
+            'datasets': None
+        }
+        with open(self.env_conf_path, 'r') as f:
+            _dict['env'] = yaml.load(f.read())
+        with open(self.runtime_conf_path, 'r') as f:
+            _dict['job_metadata'] = yaml.load(f.read())
+        with open(self.datasets_conf_path, 'r') as f:
+            _dict['datasets'] = yaml.load(f.read())
+        return munchify(_dict, factory=Environment)
+
+    def _resolve_supported_frameworks(self):
+        supported_frameworks = {}
+        for subclass in self.__class__.__subclasses__():
+            if hasattr(subclass, '_framework_name'):
+                supported_frameworks[subclass._framework_name] = subclass
+        return supported_frameworks
+
+    def set_env_path(self, env_path):
+        self.env_conf_path = self._get_local_file_path(env_path)
+        self.env = self._create_env()
+        return self
+
+    def set_runtime_path(self, runtime_path):
+        self.runtime_conf_path = self._get_local_file_path(runtime_path)
+        self.env = self._create_env()
+        return self
+
+    def set_datasets_path(self, datasets_path):
+        self.datasets_conf_path = self._get_local_file_path(datasets_path)
+        self.env = self._create_env()
+        return self
+
+    def as_type(self, framework_name):
+        try:
+            framework_builder = self._frameworks[framework_name]()
+            framework_builder.set_env_path(self.env_conf_path)
+            framework_builder.set_datasets_path(self.datasets_conf_path)
+            framework_builder.set_runtime_path(self.runtime_conf_path)
+            return framework_builder
+        except KeyError:
+            raise RuntimeNotSupportedError(
+                "Runtime for '{}' is not supported, are you sure it is installed?".format(framework_name))
+
+    @abc.abstractmethod
+    def build(self):
+        pass
+
+
 class BaseAmaContext(abc.ABC):
 
-    instance = None
+    def __init__(self, environment: Environment):
+        self._env = environment
+
+    @classmethod
+    @abc.abstractmethod
+    def builder(cls):
+        pass
 
     @property
     @abc.abstractmethod
@@ -65,79 +148,12 @@
     def get_dataset(self, dataset_name: str):
         self.dataset_manager.load_dataset(dataset_name)
 
-    def __new__(cls, *args, **kwargs) -> 'BaseAmaContext':
-        '''
-        This is a little ugly hack, but we need LazyProxy to implement a singleton ama_context.
-        :param args:
-        :param kwargs:
-        :return:
-        '''
-        if not cls.instance:
-            cls.instance = _LazyProxy(cls, *args, **kwargs)
-        return cls.instance
-
-
-class _LazyProxy:
-
-    instance = None
-
-    def __init__(self, cls, *args, **kwargs):
-        """
-        Utility singleton object that is really instantiated only
-        when it is first accessed. We use it to instantiate our contexts to
-        provide an easier API for users.
-
-        e.g. for the Python SDK developer:
-        Let's say that you have a class "AAAContext" for the framework named
-        "AAA", it is the SDK developer's responsibility to wrap it with
-        a LazyProxy object.
-        An example usage woul be:
-        >>> ama_context = _LazyProxy(AAAContext, *args, **kwargs)
-        At this point, the AAAContext isn't instantiated.
-        When the framework user tries to access the context for the first time,
-        only then the AAAContext is instantiated.
-        e.g. -
-        >>> ama_context.get_dataset("somename", "somevalue") <-- instance is now created.
-        >>> ama_context.get_dataset("anothername", "anothervalue") <-- instance is reused
-
-        :param cls:
-        :param args:
-        :param kwargs:
-        """
-        super(_LazyProxy, self).__setattr__('cls', cls)
-        super(_LazyProxy, self).__setattr__('args', args)
-        super(_LazyProxy, self).__setattr__('kwargs', kwargs)
-
-    def _get_or_create_instance(self):
-        instance = super(_LazyProxy, self).__getattribute__('instance')
-        if not instance:
-            cls = super(_LazyProxy, self).__getattribute__('cls')
-            args = super(_LazyProxy, self).__getattribute__('args')
-            kwargs = super(_LazyProxy, self).__getattribute__('kwargs')
-            instance = object.__new__(cls)
-            instance.__init__(*args, **kwargs)
-            super(_LazyProxy, self).__setattr__('instance', instance)
-        return instance
-
-    def __getattr__(self, item):
-        instance = super(_LazyProxy, self).__getattribute__('_get_or_create_instance')()
-        return getattr(instance, item)
-
-    def __setattr__(self, key, value):
-        instance = self._get_or_create_instance()
-        return setattr(instance, key, value)
-
-
-class Environment(Munch):
-    pass
-
 
 class Notifier(logging.Logger):
 
     def __init__(self, name, level=logging.NOTSET):
         super().__init__(name, level)
-        handler = _LazyProxy(AmaActiveMQNotificationHandler)
-        self.addHandler(handler)
+        self.addHandler(AmaActiveMQNotificationHandler)
 
 
 def _create_configuration():
@@ -154,7 +170,8 @@
 
 
 conf = _create_configuration()
+# ama_context = BaseAmaContext()
 logging.setLoggerClass(Notifier)
-notifier = logging.getLogger(__name__)
-atexit.register(lambda: notifier.info('Action {} finished successfully'.format(conf.job_metadata.actionName)))
-__all__ = ['BaseAmaContext', 'conf', 'notifier']
+# notifier = logging.getLogger(__name__)
+# atexit.register(lambda: notifier.info('Action {} finished successfully'.format(conf.job_metadata.actionName)))
+__all__ = ['BaseAmaContext', 'conf']