blob: 947c792ccc525db14fc900eb10bc1606f18b39d2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require File.dirname(__FILE__) + '/spec_helper'
describe Producer do
before(:each) do
@mocked_socket = mock(TCPSocket)
TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
@producer = Producer.new
end
describe "Kafka Producer" do
it "should have a PRODUCE_REQUEST_ID" do
Producer::PRODUCE_REQUEST_ID.should eql(0)
end
it "should have a topic and a partition" do
@producer.should respond_to(:topic)
@producer.should respond_to(:partition)
end
it "should set a topic and partition on initialize" do
@producer = Producer.new({ :host => "localhost", :port => 9092, :topic => "testing" })
@producer.topic.should eql("testing")
@producer.partition.should eql(0)
@producer = Producer.new({ :topic => "testing", :partition => 3 })
@producer.partition.should eql(3)
end
it "should set default host and port if none is specified" do
@producer = Producer.new
@producer.host.should eql("localhost")
@producer.port.should eql(9092)
end
describe "Message Encoding" do
it "should encode a message" do
message = Kafka::Message.new("alejandro")
full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload
@producer.encode(message).should eql(full_message)
end
it "should encode an empty message" do
message = Kafka::Message.new()
full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
@producer.encode(message).should eql(full_message)
end
end
describe "Request Encoding" do
it "should binary encode an empty request" do
bytes = @producer.encode_request("test", 0, [])
bytes.length.should eql(20)
bytes.should eql("\000\000\000\020\000\000\000\004test\000\000\000\000\000\000\000\000")
end
it "should binary encode a request with a message, using a specific wire format" do
message = Kafka::Message.new("ale")
bytes = @producer.encode_request("test", 3, message)
data_size = bytes[0, 4].unpack("N").shift
request_id = bytes[4, 2].unpack("n").shift
topic_length = bytes[6, 2].unpack("n").shift
topic = bytes[8, 4]
partition = bytes[12, 4].unpack("N").shift
messages_length = bytes[16, 4].unpack("N").shift
messages = bytes[20, messages_length]
bytes.length.should eql(32)
data_size.should eql(28)
request_id.should eql(0)
topic_length.should eql(4)
topic.should eql("test")
partition.should eql(3)
messages_length.should eql(12)
end
end
end
it "should send messages" do
@producer.should_receive(:write).and_return(32)
message = Kafka::Message.new("ale")
@producer.send(message).should eql(32)
end
describe "Message Batching" do
it "should batch messages and send them at once" do
message1 = Kafka::Message.new("one")
message2 = Kafka::Message.new("two")
@producer.should_receive(:send).with([message1, message2]).exactly(:once).and_return(nil)
@producer.batch do |messages|
messages << message1
messages << message2
end
end
end
end