blob: fca9cc4666ae66ff35a6def0fbdd74ed8b8a3dc0 [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' filestatemanager.py '''
import os
import threading
from collections import defaultdict
from heron.statemgrs.src.python.statemanager import StateManager
from heron.proto.execution_state_pb2 import ExecutionState
from heron.proto.packing_plan_pb2 import PackingPlan
from heron.proto.physical_plan_pb2 import PhysicalPlan
from heron.proto.scheduler_pb2 import SchedulerLocation
from heron.proto.tmaster_pb2 import TMasterLocation
from heron.proto.topology_pb2 import Topology
# pylint: disable=too-many-instance-attributes
class FileStateManager(StateManager):
"""
State manager which reads states from local file system.
This is not a production level state manager. The watches
are based on polling the file system at regular intervals.
"""
def __init__(self, name, rootpath):
self.name = name
self.rootpath = rootpath
# This is the cache of the state directories.
self.topologies_directory = {}
self.execution_state_directory = {}
self.packing_plan_directory = {}
self.pplan_directory = {}
self.tmaster_directory = {}
self.scheduler_location_directory = {}
# The watches are triggered when there
# is a corresponding change.
# The list contains the callbacks to be called
# when topologies change.
self.topologies_watchers = []
# The dictionary is from the topology name
# to the callback.
self.topology_watchers = defaultdict(lambda: [])
self.execution_state_watchers = defaultdict(lambda: [])
self.packing_plan_watchers = defaultdict(lambda: [])
self.pplan_watchers = defaultdict(lambda: [])
self.tmaster_watchers = defaultdict(lambda: [])
self.scheduler_location_watchers = defaultdict(lambda: [])
# Instantiate the monitoring thread.
self.monitoring_thread = threading.Thread(target=self.monitor)
# cancellable sleep
self.event = threading.Event()
# pylint: disable=attribute-defined-outside-init
def start(self):
""" start monitoring thread """
self.monitoring_thread_stop_signal = False
self.monitoring_thread.start()
def stop(self):
"""" stop monitoring thread """
self.monitoring_thread_stop_signal = True
self.event.set()
def monitor(self):
"""
Monitor the rootpath and call the callback
corresponding to the change.
This monitoring happens periodically. This function
is called in a seperate thread from the main thread,
because it sleeps for the intervals between each poll.
"""
def trigger_watches_based_on_files(watchers, path, directory, ProtoClass):
"""
For all the topologies in the watchers, check if the data
in directory has changed. Trigger the callback if it has.
"""
for topology, callbacks in list(watchers.items()):
file_path = os.path.join(path, topology)
data = ""
if os.path.exists(file_path):
with open(os.path.join(path, topology)) as f:
data = f.read()
if topology not in directory or data != directory[topology]:
proto_object = ProtoClass()
proto_object.ParseFromString(data)
for callback in callbacks:
callback(proto_object)
directory[topology] = data
while not self.monitoring_thread_stop_signal:
topologies_path = self.get_topologies_path()
topologies = []
if os.path.isdir(topologies_path):
topologies = list([f for f in os.listdir(topologies_path)
if os.path.isfile(os.path.join(topologies_path, f))])
if set(topologies) != set(self.topologies_directory):
for callback in self.topologies_watchers:
callback(topologies)
self.topologies_directory = topologies
trigger_watches_based_on_files(
self.topology_watchers, topologies_path, self.topologies_directory, Topology)
# Get the directory name for execution state
execution_state_path = os.path.dirname(self.get_execution_state_path(""))
trigger_watches_based_on_files(
self.execution_state_watchers, execution_state_path,
self.execution_state_directory, ExecutionState)
# Get the directory name for packing_plan
packing_plan_path = os.path.dirname(self.get_packing_plan_path(""))
trigger_watches_based_on_files(
self.packing_plan_watchers, packing_plan_path, self.packing_plan_directory, PackingPlan)
# Get the directory name for pplan
pplan_path = os.path.dirname(self.get_pplan_path(""))
trigger_watches_based_on_files(
self.pplan_watchers, pplan_path,
self.pplan_directory, PhysicalPlan)
# Get the directory name for tmaster
tmaster_path = os.path.dirname(self.get_tmaster_path(""))
trigger_watches_based_on_files(
self.tmaster_watchers, tmaster_path,
self.tmaster_directory, TMasterLocation)
# Get the directory name for scheduler location
scheduler_location_path = os.path.dirname(self.get_scheduler_location_path(""))
trigger_watches_based_on_files(
self.scheduler_location_watchers, scheduler_location_path,
self.scheduler_location_directory, SchedulerLocation)
# Sleep for some time
self.event.wait(timeout=5)
def get_topologies(self, callback=None):
"""get topologies"""
if callback:
self.topologies_watchers.append(callback)
else:
topologies_path = self.get_topologies_path()
return [f for f in os.listdir(topologies_path)
if os.path.isfile(os.path.join(topologies_path, f))]
def get_topology(self, topologyName, callback=None):
"""get topology"""
if callback:
self.topology_watchers[topologyName].append(callback)
else:
topology_path = self.get_topology_path(topologyName)
with open(topology_path) as f:
data = f.read()
topology = Topology()
topology.ParseFromString(data)
return topology
def create_topology(self, topologyName, topology):
"""
Create path is currently not supported in file based state manager.
"""
pass
def delete_topology(self, topologyName):
"""
Delete path is currently not supported in file based state manager.
"""
pass
def get_packing_plan(self, topologyName, callback=None):
""" get packing plan """
if callback:
self.packing_plan_watchers[topologyName].append(callback)
else:
packing_plan_path = self.get_packing_plan_path(topologyName)
with open(packing_plan_path) as f:
data = f.read()
packing_plan = PackingPlan()
packing_plan.ParseFromString(data)
def get_pplan(self, topologyName, callback=None):
"""
Get physical plan of a topology
"""
if callback:
self.pplan_watchers[topologyName].append(callback)
else:
pplan_path = self.get_pplan_path(topologyName)
with open(pplan_path) as f:
data = f.read()
pplan = PhysicalPlan()
pplan.ParseFromString(data)
return pplan
def create_pplan(self, topologyName, pplan):
"""
Create path is currently not supported in file based state manager.
"""
pass
def delete_pplan(self, topologyName):
"""
Delete path is currently not supported in file based state manager.
"""
pass
def get_execution_state(self, topologyName, callback=None):
"""
Get execution state
"""
if callback:
self.execution_state_watchers[topologyName].append(callback)
else:
execution_state_path = self.get_execution_state_path(topologyName)
with open(execution_state_path) as f:
data = f.read()
executionState = ExecutionState()
executionState.ParseFromString(data)
return executionState
def create_execution_state(self, topologyName, executionState):
"""
Create path is currently not supported in file based state manager.
"""
pass
def delete_execution_state(self, topologyName):
"""
Delete path is currently not supported in file based state manager.
"""
pass
def get_tmaster(self, topologyName, callback=None):
"""
Get tmaster
"""
if callback:
self.tmaster_watchers[topologyName].append(callback)
else:
tmaster_path = self.get_tmaster_path(topologyName)
with open(tmaster_path) as f:
data = f.read()
tmaster = TMasterLocation()
tmaster.ParseFromString(data)
return tmaster
def get_scheduler_location(self, topologyName, callback=None):
"""
Get scheduler location
"""
if callback:
self.scheduler_location_watchers[topologyName].append(callback)
else:
scheduler_location_path = self.get_scheduler_location_path(topologyName)
with open(scheduler_location_path) as f:
data = f.read()
scheduler_location = SchedulerLocation()
scheduler_location.ParseFromString(data)
return scheduler_location