| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # coding: utf-8 |
| """A server node for the key value store.""" |
| import ctypes |
| import sys |
| import pickle |
| import logging |
| from ..base import _LIB, check_call |
| from .base import create |
| |
| __all__ = ['KVStoreServer'] |
| |
| class KVStoreServer(object): |
| """The key-value store server.""" |
| def __init__(self, kvstore): |
| """Initialize a new KVStoreServer. |
| |
| Parameters |
| ---------- |
| kvstore : KVStore |
| """ |
| self.kvstore = kvstore |
| self.handle = kvstore.handle |
| self.init_logginig = False |
| |
| def _controller(self): |
| """Return the server controller.""" |
| def server_controller(cmd_id, cmd_body, _): |
| """Server controler.""" |
| if not self.init_logginig: |
| # the reason put the codes here is because we cannot get |
| # kvstore.rank earlier |
| head = '%(asctime)-15s Server[' + str( |
| self.kvstore.rank) + '] %(message)s' |
| logging.basicConfig(level=logging.DEBUG, format=head) |
| self.init_logginig = True |
| |
| if cmd_id == 0: |
| try: |
| optimizer = pickle.loads(cmd_body) |
| except: |
| raise |
| self.kvstore.set_optimizer(optimizer) |
| else: |
| print(f"server {self.kvstore.rank}, unknown command ({cmd_id}, {cmd_body})") |
| return server_controller |
| |
| def run(self): |
| """Run the server, whose behavior is like. |
| |
| |
| >>> while receive(x): |
| ... if is_command x: controller(x) |
| ... else if is_key_value x: updater(x) |
| """ |
| _ctrl_proto = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p, ctypes.c_void_p) |
| check_call(_LIB.MXKVStoreRunServer(self.handle, _ctrl_proto(self._controller()), None)) |
| |
| def _init_kvstore_server_module(): |
| """Start server/scheduler.""" |
| is_worker = ctypes.c_int() |
| check_call(_LIB.MXKVStoreIsWorkerNode(ctypes.byref(is_worker))) |
| if is_worker.value == 0: |
| kvstore = create('dist') |
| server = KVStoreServer(kvstore) |
| server.run() |
| sys.exit() |
| |
| _init_kvstore_server_module() |