blob: 9c7163e8067e4fedf11e04e7589600e5915085ba [file] [log] [blame]
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* AMQPAppender appends log events to a AMQP.
*
* This appender uses a layout.
* Compatible with php_amqp versions: 1.0.8 - 1.3.0
*
* This class was originally contributed by Dmitry Ulyanov.
*
* ## Configurable parameters: ##
*
* - **host** - Server on which AMQP instance is located.
* - **port** - Port on which the instance is bound.
* - **vhost** - The name of the "virtual host".
* - **login** - Login used to connect to the AMQP server.
* - **password** - Password used to connect to the AMQP server.
* - **exchangeName** - Name of AMQP exchange which used to routing logs.
* - **exchangeType** - Type of AMQP exchange (direct | fanout). "direct" type is default.
* - **routingKey** - Routing key which used to routing logs. Set up AMQP server
* to route messages with this key to your queue
* - **contentType** - AMQP message "content-type" header. Example: "application/json", "application/octet-stream".
* Default - "text/plain".
* - **contentEncoding** - AMQP message "content-encoding" header. Default - "UTF-8".
* - **flushOnShutdown** - Stash logs and send on shutdown or send immediately.
* You can show content before start sending logs to AMQP. Disabled by default.
* - **connectionTimeout** - Connection timeout (in seconds). Default is 0.5.
*
* @package log4php
* @subpackage appenders
* @since 2.4.0
* @author Dmitry Ulyanov dmitriy.ulyanov@wikimart.ru
* @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
* @link http://logging.apache.org/log4php/docs/appenders/amqp.html Appender documentation
* @link http://github.com/d-ulyanov/log4php-graylog2 Dmitry Ulyanov's original submission
* @link http://www.rabbitmq.com/ RabbitMQ website
*/
class LoggerAppenderAMQP extends LoggerAppender
{
/** Default value for {@link $host} */
const DEFAULT_AMQP_HOST = 'localhost';
/** Default value for {@link $port} */
const DEFAULT_AMQP_PORT = 5672;
/** Default value for {@link $vhost} */
const DEFAULT_AMQP_VHOST = '/';
/** Default value for {@link $login} */
const DEFAULT_AMQP_LOGIN = 'guest';
/** Default value for {@link $password} */
const DEFAULT_AMQP_PASSWORD = 'guest';
/**
* Server on which AMQP instance is located
* @var string
*/
protected $host = self::DEFAULT_AMQP_HOST;
/**
* Port on which the instance is bound
* @var int
*/
protected $port = self::DEFAULT_AMQP_PORT;
/**
* The name of the "virtual host"
* @var string
*/
protected $vhost = self::DEFAULT_AMQP_VHOST;
/**
* Login used to connect to the AMQP server
* @var string
*/
protected $login = self::DEFAULT_AMQP_LOGIN;
/**
* Password used to connect to the AMQP server
* @var string
*/
protected $password = self::DEFAULT_AMQP_PASSWORD;
/**
* Name of AMQP exchange which used to routing logs
* @var string
*/
protected $exchangeName;
/**
* Type of AMQP exchange
* @var string
*/
protected $exchangeType = 'direct';
/**
* Routing key which used to routing logs
* @var string
*/
protected $routingKey;
/**
* Connection timeout in seconds
* @var float
*/
protected $connectionTimeout = 0.5;
/**
* Content-type header
* @var string
*/
protected $contentType = 'text/plain';
/**
* Content-encoding header
* @var string
*/
protected $contentEncoding = 'UTF-8';
/**
* Send logs immediately or stash it and send on shutdown
* @var boolean
*/
protected $flushOnShutdown = false;
/**
* @var AMQPConnection
*/
protected $AMQPConnection;
/**
* @var AMQPExchange
*/
protected $AMQPExchange;
/**
* Stashed logs
* @var array
*/
protected $logsStash = array();
/**
* Forwards the logging event to the AMQP.
* @param LoggerLoggingEvent $event
*/
protected function append(LoggerLoggingEvent $event)
{
$this->processLog(
$this->layout->format($event),
$this->getFlushOnShutdown()
);
}
/**
* @param string $message
* @param boolean $flushOnShutdown
*/
public function processLog($message, $flushOnShutdown)
{
if ($flushOnShutdown) {
$this->stashLog($message);
} else {
$this->sendLogToAMQP($message);
}
}
/**
* Setup AMQP connection.
* Based on defined options, this method connects to the AMQP
* and creates a {@link $AMQPConnection} and {@link $AMQPExchange}.
*/
public function activateOptions() {
try {
$connection = $this->createAMQPConnection(
$this->getHost(),
$this->getPort(),
$this->getVhost(),
$this->getLogin(),
$this->getPassword(),
$this->getConnectionTimeout()
);
$this->setAMQPConnection($connection);
$exchange = $this->createAMQPExchange(
$connection,
$this->getExchangeName(),
$this->getExchangeType()
);
$this->setAMQPExchange($exchange);
} catch (AMQPConnectionException $e) {
$this->closed = true;
$this->warn(sprintf('Failed to connect to amqp server: %s', $e->getMessage()));
} catch (AMQPChannelException $e) {
$this->closed = true;
$this->warn(sprintf('Failed to open amqp channel: %s', $e->getMessage()));
} catch (AMQPExchangeException $e) {
$this->closed = true;
$this->warn(sprintf('Failed to declare amqp exchange: %s', $e->getMessage()));
} catch (Exception $e) {
$this->closed = true;
$this->warn(sprintf('Amqp connection exception: %s', $e->getMessage()));
}
}
/**
* @param string $host
* @param int $port
* @param string $vhost
* @param string $login
* @param string $password
* @param float $connectionTimeout
* @return AMQPConnection
* @throws AMQPConnectionException
* @throws Exception
*/
protected function createAMQPConnection($host, $port, $vhost, $login, $password, $connectionTimeout)
{
$connection = new AMQPConnection();
$connection->setHost($host);
$connection->setPort($port);
$connection->setVhost($vhost);
$connection->setLogin($login);
$connection->setPassword($password);
$connection->setReadTimeout($connectionTimeout);
if (!$connection->connect()) {
throw new Exception('Cannot connect to the broker');
}
return $connection;
}
/**
* @param AMQPConnection $AMQPConnection
* @param $exchangeName
* @param $exchangeType
* @return AMQPExchange
* @throws AMQPConnectionException
* @throws AMQPExchangeException
* @throws Exception
*/
protected function createAMQPExchange($AMQPConnection, $exchangeName, $exchangeType)
{
$channel = new AMQPChannel($AMQPConnection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType($exchangeType);
$exchange->setFlags(AMQP_DURABLE);
// Since php_amqp 1.2.0: deprecate AMQPExchange::declare() in favor of AMQPExchange::declareExchange()
$declareMethodName = method_exists($exchange, 'declareExchange') ? 'declareExchange' : 'declare';
if (!$exchange->$declareMethodName()) {
throw new Exception('Cannot declare exchange');
}
return $exchange;
}
/**
* @param AMQPConnection $AMQPConnection
*/
protected function setAMQPConnection($AMQPConnection)
{
$this->AMQPConnection = $AMQPConnection;
}
/**
* @return AMQPConnection
*/
public function getAMQPConnection()
{
return $this->AMQPConnection;
}
/**
* @param AMQPExchange $AMQPExchange
*/
protected function setAMQPExchange($AMQPExchange)
{
$this->AMQPExchange = $AMQPExchange;
}
/**
* @return AMQPExchange
*/
public function getAMQPExchange()
{
return $this->AMQPExchange;
}
/**
* @param string $AMQPRoutingKey
*/
public function setRoutingKey($AMQPRoutingKey)
{
$this->setString('routingKey', $AMQPRoutingKey);
}
/**
* @return string
*/
public function getRoutingKey()
{
return $this->routingKey;
}
/**
* @param string $host
*/
public function setHost($host)
{
$this->setString('host', $host);
}
/**
* @return string
*/
public function getHost()
{
return $this->host;
}
/**
* @param string $login
*/
public function setLogin($login)
{
$this->setString('login', $login);
}
/**
* @return string
*/
public function getLogin()
{
return $this->login;
}
/**
* @param string $password
*/
public function setPassword($password)
{
$this->setString('password', $password);
}
/**
* @return string
*/
public function getPassword()
{
return $this->password;
}
/**
* @param int $port
*/
public function setPort($port)
{
$this->setPositiveInteger('port', $port);
}
/**
* @return int
*/
public function getPort()
{
return $this->port;
}
/**
* @param string $vhost
*/
public function setVhost($vhost)
{
$this->setString('vhost', $vhost);
}
/**
* @return string
*/
public function getVhost()
{
return $this->vhost;
}
/**
* @param string $exchange
*/
public function setExchangeName($exchange)
{
$this->setString('exchangeName', $exchange);
}
/**
* @return string
*/
public function getExchangeName()
{
return $this->exchangeName;
}
/**
* @param string $exchangeType
*/
public function setExchangeType($exchangeType)
{
$this->setString('exchangeType', $exchangeType);
}
/**
* @return string
*/
public function getExchangeType()
{
return $this->exchangeType;
}
/**
* @param string $contentEncoding
*/
public function setContentEncoding($contentEncoding)
{
$this->setString('contentEncoding', $contentEncoding);
}
/**
* @return string
*/
public function getContentEncoding()
{
return $this->contentEncoding;
}
/**
* @param string $contentType
*/
public function setContentType($contentType)
{
$this->setString('contentType', $contentType);
}
/**
* @return string
*/
public function getContentType()
{
return $this->contentType;
}
/**
* @param float $connectionTimeout
*/
public function setConnectionTimeout($connectionTimeout)
{
if (is_numeric($connectionTimeout) && $connectionTimeout > 0) {
$this->connectionTimeout = floatval($connectionTimeout);
} else {
$this->warn("Invalid value given for 'connectionTimeout' property: [$connectionTimeout]. Expected a positive float. Property not changed.");
}
}
/**
* @return float
*/
public function getConnectionTimeout()
{
return $this->connectionTimeout;
}
/**
* @param boolean $flushOnShutdown
*/
public function setFlushOnShutdown($flushOnShutdown)
{
$this->setBoolean('flushOnShutdown', $flushOnShutdown);
}
/**
* @return boolean
*/
public function getFlushOnShutdown()
{
return $this->flushOnShutdown;
}
/**
* @param array $logs Array of strings
*/
public function sendLogsArrayToAMQP($logs)
{
foreach ($logs as $log) {
if ($this->closed) {
break;
}
$this->sendLogToAMQP($log);
}
}
/**
* @param string $log
*/
public function sendLogToAMQP($log)
{
try {
$this->getAMQPExchange()->publish(
$log,
$this->getRoutingKey(),
AMQP_NOPARAM,
array(
'content_type' => $this->getContentType(),
'content_encoding' => $this->getContentEncoding()
)
);
} catch (AMQPConnectionException $e) {
$this->closed = true;
$this->warn(sprintf('Connection to the broker was lost: %s', $e->getMessage()));
} catch (AMQPChannelException $e) {
$this->closed = true;
$this->warn(sprintf('Channel is not open: %s', $e->getMessage()));
} catch (AMQPExchangeException $e) {
$this->closed = true;
$this->warn(sprintf('Failed to publish message: %s', $e->getMessage()));
} catch (Exception $e) {
$this->warn(sprintf('Failed to publish message, unknown exception: %s', $e->getMessage()));
}
}
/**
* @param string $log
*/
public function stashLog($log)
{
$this->logsStash[] = $log;
}
public function cleanStashedLogs()
{
$this->logsStash = array();
}
public function close()
{
if ($this->getFlushOnShutdown()) {
$this->sendLogsArrayToAMQP($this->logsStash);
$this->cleanStashedLogs();
}
$this->setAMQPExchange(null);
$this->setAMQPConnection(null);
parent::close();
}
}