blob: 56a7bd8421e5a59271da8bf594694d6b077a76c9 [file] [log] [blame]
<?php
/**
* Kafka Client
*
* @category Libraries
* @package Kafka
* @author Lorenzo Alberton <l.alberton@quipo.it>
* @copyright 2011 Lorenzo Alberton
* @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
* @version $Revision: $
* @link http://sna-projects.com/kafka/
*/
/**
* Simple Kafka Producer
*
* @category Libraries
* @package Kafka
* @author Lorenzo Alberton <l.alberton@quipo.it>
* @license http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
* @link http://sna-projects.com/kafka/
*/
class Kafka_Producer
{
/**
* @var integer
*/
protected $request_key;
/**
* @var resource
*/
protected $conn;
/**
* @var string
*/
protected $host;
/**
* @var integer
*/
protected $port;
/**
* @var integer
*/
protected $compression;
/**
* Constructor
*
* @param integer $host Host
* @param integer $port Port
*/
public function __construct($host, $port) {
$this->request_key = 0;
$this->host = $host;
$this->port = $port;
$this->compression = 0;
}
/**
* Connect to Kafka via a socket
*
* @return void
* @throws RuntimeException
*/
public function connect() {
if (!is_resource($this->conn)) {
$this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr);
}
if (!is_resource($this->conn)) {
throw new RuntimeException('Cannot connect to Kafka: ' . $errstr, $errno);
}
}
/**
* Close the socket
*
* @return void
*/
public function close() {
if (is_resource($this->conn)) {
fclose($this->conn);
}
}
/**
* Send messages to Kafka
*
* @param array $messages Messages to send
* @param string $topic Topic
* @param integer $partition Partition
*
* @return boolean
*/
public function send(array $messages, $topic, $partition = 0xFFFFFFFF) {
$this->connect();
return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression));
}
/**
* When serializing, close the socket and save the connection parameters
* so it can connect again
*
* @return array Properties to save
*/
public function __sleep() {
$this->close();
return array('request_key', 'host', 'port');
}
/**
* Restore parameters on unserialize
*
* @return void
*/
public function __wakeup() {
}
}