blob: c3472a784d693f6df2cb1ebab259fda357654d80 [file] [log] [blame]
import uuid
from copy import copy
class Connectable(object):
def __init__(self,
name=None,
auto_terminate=None):
self.uuid = uuid.uuid4()
if name is None:
self.name = str(self.uuid)
else:
self.name = name
if auto_terminate is None:
self.auto_terminate = []
else:
self.auto_terminate = auto_terminate
self.connections = {}
self.out_proc = self
self.drop_empty_flowfiles = False
def connect(self, connections):
for rel in connections:
# Ensure that rel is not auto-terminated
if rel in self.auto_terminate:
del self.auto_terminate[self.auto_terminate.index(rel)]
# Add to set of output connections for this rel
if rel not in self.connections:
self.connections[rel] = []
self.connections[rel].append(connections[rel])
return self
def __rshift__(self, other):
"""
Right shift operator to support flow DSL, for example:
GetFile('/input') >> LogAttribute() >> PutFile('/output')
"""
connected = copy(self)
connected.connections = copy(self.connections)
if self.out_proc is self:
connected.out_proc = connected
else:
connected.out_proc = copy(connected.out_proc)
if isinstance(other, tuple):
if isinstance(other[0], tuple):
for rel_tuple in other:
rel = {rel_tuple[0]: rel_tuple[1]}
connected.out_proc.connect(rel)
else:
rel = {other[0]: other[1]}
connected.out_proc.connect(rel)
else:
connected.out_proc.connect({'success': other})
connected.out_proc = other
return connected
def __invert__(self):
"""
Invert operation to set empty file filtering on incoming connections
GetFile('/input') >> ~LogAttribute()
"""
self.drop_empty_flowfiles = True
return self