blob: 797ae96a9de489d0bbe57bf83eb5dbc095fe056e [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from flink.connection import Connection
from flink.connection import Collector
from flink.connection import Iterator
from flink.plan.DataSet import DataSet
from flink.plan.Constants import _Identifier
from flink.plan.OperationInfo import OperationInfo
from flink.utilities import Switch
import socket as SOCKET
import copy
import sys
from struct import pack
class EnvironmentContainer(object):
"""Keeps track of which ExecutionEnvironment is active."""
_environment_counter = 0
_environment_id_to_execute = None
_plan_mode = None
def create_environment(self):
"""Creates a new environment with a unique id."""
env = Environment(self, self._environment_counter)
self._environment_counter += 1
return env
def is_planning(self):
"""
Checks whether we are generating the plan or executing an operator.
:return: True, if the plan is generated, false otherwise
"""
if self._plan_mode is None:
mode = sys.stdin.readline().rstrip('\n')
if mode == "plan":
self._plan_mode = True
elif mode == "operator":
self._plan_mode = False
else:
raise ValueError("Invalid mode specified: " + mode)
return self._plan_mode
def should_execute(self, environment):
"""
Checks whether the given ExecutionEnvironment should run the contained plan.
:param: ExecutionEnvironment to check
:return: True, if the environment should run the contained plan, false otherise
"""
if self._environment_id_to_execute is None:
self._environment_id_to_execute = int(sys.stdin.readline().rstrip('\n'))
return environment._env_id == self._environment_id_to_execute
container = EnvironmentContainer()
def get_environment():
"""
Creates an execution environment that represents the context in which the program is currently executed.
:return:The execution environment of the context in which the program is executed.
"""
return container.create_environment()
class Environment(object):
def __init__(self, container, env_id):
# util
self._counter = 0
#parameters
self._dop = -1
self._local_mode = False
self._retry = 0
self._container = container
self._env_id = env_id
#sets
self._sources = []
self._sets = []
self._sinks = []
#specials
self._broadcast = []
self._types = []
def register_type(self, type, serializer, deserializer):
"""
Registers the given type with this environment, allowing all operators within to
(de-)serialize objects of the given type.
:param type: class of the objects to be (de-)serialized
:param serializer: instance of the serializer
:param deserializer: instance of the deserializer
"""
self._types.append((pack(">i",126 - len(self._types))[3:], type, serializer, deserializer))
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
"""
Create a DataSet that represents the tuples produced by reading the given CSV file.
:param path: The path of the CSV file.
:param types: Specifies the types for the CSV fields.
:return:A CsvReader that can be used to configure the CSV input.
"""
child = OperationInfo()
child_set = DataSet(self, child)
child.identifier = _Identifier.SOURCE_CSV
child.delimiter_line = line_delimiter
child.delimiter_field = field_delimiter
child.path = path
child.types = types
self._sources.append(child)
return child_set
def read_text(self, path):
"""
Creates a DataSet that represents the Strings produced by reading the given file line wise.
The file will be read with the system's default character set.
:param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
:return: A DataSet that represents the data read from the given file as text lines.
"""
child = OperationInfo()
child_set = DataSet(self, child)
child.identifier = _Identifier.SOURCE_TEXT
child.path = path
self._sources.append(child)
return child_set
def from_elements(self, *elements):
"""
Creates a new data set that contains the given elements.
The elements must all be of the same type, for example, all of the String or Integer.
The sequence of elements must not be empty.
:param elements: The elements to make up the data set.
:return: A DataSet representing the given list of elements.
"""
child = OperationInfo()
child_set = DataSet(self, child)
child.identifier = _Identifier.SOURCE_VALUE
child.values = elements
self._sources.append(child)
return child_set
def generate_sequence(self, frm, to):
"""
Creates a new data set that contains the given sequence
:param frm: The start number for the sequence.
:param to: The end number for the sequence.
:return: A DataSet representing the given sequence of numbers.
"""
child = OperationInfo()
child_set = DataSet(self, child)
child.identifier = _Identifier.SOURCE_SEQ
child.frm = frm
child.to = to
self._sources.append(child)
return child_set
def set_parallelism(self, parallelism):
"""
Sets the parallelism for operations executed through this environment.
Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances.
:param parallelism: The degree of parallelism
"""
self._dop = parallelism
def get_parallelism(self):
"""
Gets the parallelism with which operation are executed by default.
:return The parallelism used by operations.
"""
return self._dop
def set_number_of_execution_retries(self, count):
self._retry = count
def get_number_of_execution_retries(self):
return self._retry
def execute(self, local=False):
"""
Triggers the program execution.
The environment will execute all parts of the program that have resulted in a "sink" operation.
"""
self._local_mode = local
self._optimize_plan()
if self._container.is_planning():
port = int(sys.stdin.readline().rstrip('\n'))
self._connection = Connection.PureTCPConnection(port)
self._iterator = Iterator.PlanIterator(self._connection, self)
self._collector = Collector.PlanCollector(self._connection, self)
self._send_plan()
result = self._receive_result()
self._connection.close()
return result
else:
import struct
operator = None
port = None
try:
if self._container.should_execute(self):
id = int(sys.stdin.readline().rstrip('\n'))
port = int(sys.stdin.readline().rstrip('\n'))
subtask_index = int(sys.stdin.readline().rstrip('\n'))
mmap_size = int(sys.stdin.readline().rstrip('\n'))
input_path = sys.stdin.readline().rstrip('\n')
output_path = sys.stdin.readline().rstrip('\n')
used_set = None
operator = None
for set in self._sets:
if set.id == id:
used_set = set
operator = set.operator
operator._configure(input_path, output_path, mmap_size, port, self, used_set, subtask_index)
operator._go()
operator._close()
sys.stdout.flush()
sys.stderr.flush()
except:
sys.stdout.flush()
sys.stderr.flush()
if operator is not None and operator._connection is not None:
operator._connection._socket.send(struct.pack(">i", -2))
elif port is not None:
socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM)
socket.connect((SOCKET.gethostbyname("localhost"), port))
socket.send(struct.pack(">i", -2))
socket.close()
raise
def _optimize_plan(self):
self._find_chains()
def _find_chains(self):
chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP])
dual_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
x = len(self._sets) - 1
while x > -1:
# CHAIN(parent -> child) -> grand_child
# for all intents and purposes the child set ceases to exist; it is merged into the parent
child = self._sets[x]
child_type = child.identifier
if child_type in chainable:
parent = child.parent
# we can only chain to an actual python udf (=> operator is not None)
# we may only chain if the parent has only 1 child
# we may only chain if the parent is not used as a broadcast variable
# we may only chain if the parent does not use the child as a broadcast variable
if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0 and parent not in self._broadcast and child not in parent.bcvars:
parent.chained_info = child
parent.name += " -> " + child.name
parent.types = child.types
# grand_children now belong to the parent
for grand_child in child.children:
# dual_input operations have 2 parents; hence we have to change the correct one
if grand_child.identifier in dual_input:
if grand_child.parent.id == child.id:
grand_child.parent = parent
else:
grand_child.other = parent
else:
grand_child.parent = parent
parent.children.append(grand_child)
# if child is used as a broadcast variable the parent must now be used instead
for s in self._sets:
if child in s.bcvars:
s.bcvars.remove(child)
s.bcvars.append(parent)
for bcvar in self._broadcast:
if bcvar.other.id == child.id:
bcvar.other = parent
# child sinks now belong to the parent
for sink in child.sinks:
sink.parent = parent
parent.sinks.append(sink)
# child broadcast variables now belong to the parent
for bcvar in child.bcvars:
bcvar.parent = parent
parent.bcvars.append(bcvar)
# remove child set as it has been merged into the parent
parent.children.remove(child)
self._remove_set(child)
x -= 1
def _remove_set(self, set):
self._sets[:] = [s for s in self._sets if s.id != set.id]
def _send_plan(self):
self._send_parameters()
self._send_operations()
def _send_parameters(self):
collect = self._collector.collect
collect(("dop", self._dop))
collect(("mode", self._local_mode))
collect(("retry", self._retry))
collect(("id", self._env_id))
def _send_operations(self):
self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast))
for source in self._sources:
self._send_operation(source)
for set in self._sets:
self._send_operation(set)
for sink in self._sinks:
self._send_operation(sink)
for bcv in self._broadcast:
self._send_operation(bcv)
def _send_operation(self, set):
collect = self._collector.collect
collect(set.identifier)
collect(set.parent.id if set.parent is not None else -1)
collect(set.other.id if set.other is not None else -1)
collect(set.field)
collect(set.order)
collect(set.keys)
collect(set.key1)
collect(set.key2)
collect(set.types)
collect(set.uses_udf)
collect(set.name)
collect(set.delimiter_line)
collect(set.delimiter_field)
collect(set.write_mode)
collect(set.path)
collect(set.frm)
collect(set.to)
collect(set.id)
collect(set.to_err)
collect(set.count)
collect(len(set.values))
for value in set.values:
collect(value)
collect(set.parallelism.value)
def _receive_result(self):
jer = JobExecutionResult()
jer._net_runtime = self._iterator.next()
return jer
class JobExecutionResult:
def __init__(self):
self._net_runtime = 0
def get_net_runtime(self):
return self._net_runtime