ARIA-214 Dry execution changes the state of non implemented operations
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 78159c4..b3dfb3c 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -163,9 +163,6 @@
         self._task_id = task_model.id
         self._update_fields = None
 
-    def execute(self):
-        super(OperationTask, self).execute()
-
     @contextmanager
     def _update(self):
         """
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 0bbce90..fec108b 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -48,11 +48,7 @@
             execution_graph, dependencies, default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            if api_task.implementation:
-                operation_task = core_task.OperationTask(api_task, executor=default_executor)
-            else:
-                operation_task = core_task.OperationTask(api_task,
-                                                         executor=base.EmptyOperationExecutor())
+            operation_task = core_task.OperationTask(api_task, executor=default_executor)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index a225837..c543278 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -25,13 +25,22 @@
     """
     Base class for executors for running tasks
     """
+    def _execute(self, task):
+        raise NotImplementedError
 
     def execute(self, task):
         """
         Execute a task
         :param task: task to execute
         """
-        raise NotImplementedError
+        if task.implementation:
+            self._execute(task)
+        else:
+            # In this case the task is missing an implementation. This task still gets to an
+            # executor, but since there is nothing to run, we by default simply skip the execution
+            # itself.
+            self._task_started(task)
+            self._task_succeeded(task)
 
     def close(self):
         """
@@ -52,12 +61,6 @@
         events.on_success_task_signal.send(task)
 
 
-class StubTaskExecutor(BaseExecutor):
+class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method
     def execute(self, task):
         task.status = task.SUCCESS
-
-
-class EmptyOperationExecutor(BaseExecutor):
-    def execute(self, task):
-        events.start_task_signal.send(task)
-        events.on_success_task_signal.send(task)
diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py
index 7bd9b7c..bbddc25 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -42,7 +42,7 @@
         self._receiver_thread.start()
         self._started_queue.get(timeout=30)
 
-    def execute(self, task):
+    def _execute(self, task):
         self._tasks[task.id] = task
         inputs = dict(inp.unwrap() for inp in task.inputs.values())
         inputs['ctx'] = task.context
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index eb70a41..f6fb7a6 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -21,11 +21,10 @@
 from .base import BaseExecutor
 
 
-class DryExecutor(BaseExecutor):
+class DryExecutor(BaseExecutor):                                                                    # pylint: disable=abstract-method
     """
     Executor which dry runs tasks - prints task information without causing any side effects
     """
-
     def execute(self, task):
         # updating the task manually instead of calling self._task_started(task),
         # to avoid any side effects raising that event might cause
@@ -33,19 +32,20 @@
             task.started_at = datetime.utcnow()
             task.status = task.STARTED
 
-        if hasattr(task.actor, 'source_node'):
-            name = '{source_node.name}->{target_node.name}'.format(
-                source_node=task.actor.source_node, target_node=task.actor.target_node)
-        else:
-            name = task.actor.name
+        if task.implementation:
+            if hasattr(task.actor, 'source_node'):
+                name = '{source_node.name}->{target_node.name}'.format(
+                    source_node=task.actor.source_node, target_node=task.actor.target_node)
+            else:
+                name = task.actor.name
 
-        task.context.logger.info(
-            '<dry> {name} {task.interface_name}.{task.operation_name} started...'
-            .format(name=name, task=task))
+            task.context.logger.info(
+                '<dry> {name} {task.interface_name}.{task.operation_name} started...'
+                .format(name=name, task=task))
 
-        task.context.logger.info(
-            '<dry> {name} {task.interface_name}.{task.operation_name} successful'
-            .format(name=name, task=task))
+            task.context.logger.info(
+                '<dry> {name} {task.interface_name}.{task.operation_name} successful'
+                .format(name=name, task=task))
 
         # updating the task manually instead of calling self._task_succeeded(task),
         # to avoid any side effects raising that event might cause
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index f3daf04..e464f7d 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -116,7 +116,7 @@
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
-    def execute(self, task):
+    def _execute(self, task):
         self._check_closed()
         self._tasks[task.id] = task
 
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 836b2bf..f53362a 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -46,7 +46,7 @@
             thread.start()
             self._pool.append(thread)
 
-    def execute(self, task):
+    def _execute(self, task):
         self._queue.put(task)
 
     def close(self):
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 748ee20..50ca7f5 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -24,7 +24,6 @@
     api,
     core,
     exceptions,
-    executor
 )
 
 from tests import mock, storage
@@ -71,8 +70,7 @@
                 node,
                 interface_name=NODE_INTERFACE_NAME,
                 operation_name=NODE_OPERATION_NAME)
-            core_task = core.task.OperationTask(api_task=api_task,
-                                                executor=executor.base.EmptyOperationExecutor())
+            core_task = core.task.OperationTask(api_task=api_task, executor=None)
         return api_task, core_task
 
     def _create_relationship_operation_task(self, ctx, relationship):
@@ -81,8 +79,7 @@
                 relationship,
                 interface_name=RELATIONSHIP_INTERFACE_NAME,
                 operation_name=RELATIONSHIP_OPERATION_NAME)
-            core_task = core.task.OperationTask(api_task=api_task,
-                                                executor=executor.base.EmptyOperationExecutor())
+            core_task = core.task.OperationTask(api_task=api_task, executor=None)
         return api_task, core_task
 
     def test_node_operation_task_creation(self, ctx):
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index b353518..5f240b2 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 import os
 import Queue
 
@@ -66,7 +65,7 @@
     def test_closed(self, executor):
         executor.close()
         with pytest.raises(RuntimeError) as exc_info:
-            executor.execute(task=None)
+            executor.execute(task=MockTask(implementation='some.implementation'))
         assert 'closed' in exc_info.value.message
 
 
@@ -82,18 +81,3 @@
     source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
     plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
     return plugin_manager.install(source=plugin_path)
-
-
-class MockContext(object):
-
-    def __init__(self, *args, **kwargs):
-        self.logger = logging.getLogger('mock_logger')
-        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
-        self.serialization_dict = {'context_cls': self.__class__, 'context': {}}
-
-    def __getattr__(self, item):
-        return None
-
-    @classmethod
-    def deserialize_from_dict(cls, **kwargs):
-        return cls()