blob: 3e06f9297367fbfb6727bf70d2876efea4b993bb [file] [log] [blame]
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace com\fenqile\fsof\registry\automatic;
class ZookeeperClient
{
/**
* @var Zookeeper
*/
private $zookeeper = null;
private $address;
private $func = null;
private $ephemeral = false;
private $zkFile = null;
private $waiteForConnectStateTimes = 20;
private $logger;
public function __construct()
{
$this->logger = \Logger::getLogger(__CLASS__);
}
public function __destruct()
{
$this->closeLog();
unset($this->zookeeper);
$this->logger->info('zookeeperClient destruct');
}
public function connect_cb($type, $event, $string)
{
// Watcher gets consumed so we need to set a new one
$this->logger->debug("connect state:{$event}");
$params = array($type, $event, $string);
if(isset($this->func))
{
$this->logger->warn("call connect state");
call_user_func_array($this->func, $params);
}
$this->logger->debug("connect_cb end");
}
public function connectZk($address, $ephemeral = false)
{
$ret = true;
$this->address = $address;
$this->ephemeral = $ephemeral;
try
{
if ($ephemeral)
{
$this->zookeeper = new \Zookeeper($this->address, array($this,'connect_cb'));
}
else
{
$this->zookeeper = new \Zookeeper($this->address);
if($this->zookeeper)
{
while(\Zookeeper::CONNECTED_STATE != $this->zookeeper->getState()
&&($this->waiteForConnectStateTimes > 0))
{
//等待连接状态
$this->waiteForConnectStateTimes--;
$this->logger->debug("wait for connect state");
usleep(50000);
}
}
}
if(empty($this->zookeeper))
{
$ret = false;
}
}
catch (\Exception $e)
{
$ret = false;
$this->logger->error($e->getMessage().'|address:'.$this->address);
}
return $ret;
}
public function registerCallFunc($func)
{
$this->func = $func;
}
/**
* 创建节点。
*
* @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers/URL.Encoder(provider url)。
* @param ephemeral,是否是临时节点,所有树干节点都为固定节点,只有叶子节点可以设为临时节点,没有设置,默认为固定节点
*/
public function create($path)
{
$ret = $this->set($path);
$this->logger->info('zookeeperClient create|path:'.$path.'|ret:'.$ret);
return $ret;
}
/**
* 删除节点,临时节点不需要删除,zookeeper在与对应的 client的连接断开后自动删除
* @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers/URL.Encoder(provider url)。
*/
public function delete($path, $version=-1)
{
if(isset($this->zookeeper))
{
if($this->zookeeper->exists($path))
{
$this->zookeeper->delete($path, $version);
}
}
return true;
}
private function set($path, $value=null)
{
if(isset($this->zookeeper))
{
if($this->zookeeper->exists($path))
{
$this->zookeeper->set($path, $value);
}
else
{
$this->makePath($path);
$this->makeNode($path, $value, $this->ephemeral);
}
}
return true;
}
/**
* 获取指定节点的所有childe节点
*
* @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers。
*/
public function getChildren($path)
{
if((strlen($path) > 1) && preg_match('@/$@', $path))
{
// remove trailing /
$path = substr($path, 0, -1);
}
return $this->zookeeper->getChildren($path);
}
/**
* 为指定节点设置一个 childListener,用于监听其child变动情况
*
* @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers。
* @param ChildListener
*/
public function addChildListener($path, $childListener)
{
}
/**
* 删除指定节点上的 childListener
*
* @param path 节点的绝对路径,如/fsof/URL.Encode("com.fenqile.example.calculate.AddService")/providers。
* @param ChildListener
*/
public function removeChildListener($path, $childeListener)
{
}
/**
* 设置一个client的state状态监听器,如在连接建立或重建时,主动向zookeeper server拉一次数据
*
* @param StateListener
*/
public function addStateListener($stateListener)
{
}
/**
* 删除一个client的state状态监听器
*
* @param StateListener
*/
public function removeStateListener($stateListener)
{
}
public function isConnected()
{
$ret = $this->zookeeper->getState();
if(\Zookeeper::CONNECTED_STATE == $ret)
{
return true;
}
else
{
return false;
}
}
public function close()
{
}
public function getUrl()
{
}
/**
* Equivalent of "mkdir -p" on ZooKeeper
*
* @param string $path The path to the node
* @param string $value The value to assign to each new node along the path
*
* @return bool
*/
private function makePath($path, $value = '')
{
$parts = explode('/', $path);
$parts = array_filter($parts);
$subpath = '';
while (count($parts) > 1)
{
$subpath .= '/' . array_shift($parts);
if (!$this->zookeeper->exists($subpath))
{
$this->makeNode($subpath, $value);
}
}
}
/**
* Create a node on ZooKeeper at the given path
*
* @param string $path The path to the node
* @param string $value The value to assign to the new node
* @param bool $endNode The value to assign to the end node
* @param array $params Optional parameters for the Zookeeper node.
* By default, a public node is created
*
* @return string the path to the newly created node or null on failure
*/
private function makeNode($path, $value, $ephemeral = false, array $params = array())
{
//$this->logger->debug("makeNode(".$path.",".$value.",".($ephemeral?1:0).",".json_encode($params).")");
if(empty($params))
{
$params = array(
array(
'perms' => \Zookeeper::PERM_ALL,
'scheme' => 'world',
'id' => 'anyone',
)
);
}
if ($ephemeral)
{
return $this->zookeeper->create($path, $value, $params , \Zookeeper::EPHEMERAL);
}
else
{
return $this->zookeeper->create($path, $value, $params);
}
}
public function setLogFile($file, $logLevel = 2)
{
$this->zkFile = fopen($file,"a+");
if($this->zkFile && $this->zookeeper)
{
$this->zookeeper->setDebugLevel($logLevel);
$this->zookeeper->setLogStream($this->zkFile);
}
}
private function closeLog()
{
if(!empty($this->zkFile))
{
fclose($this->zkFile);
}
}
}