blob: 3b5b77b22c4bc69f3706cb52372801444470dfae [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require File.dirname(__FILE__) + '/spec_helper'
describe Consumer do
before(:each) do
@mocked_socket = mock(TCPSocket)
TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
@consumer = Consumer.new
end
describe "Kafka Consumer" do
it "should have a CONSUME_REQUEST_TYPE" do
Consumer::CONSUME_REQUEST_TYPE.should eql(1)
@consumer.should respond_to(:request_type)
end
it "should have a topic and a partition" do
@consumer.should respond_to(:topic)
@consumer.should respond_to(:partition)
end
it "should have a polling option, and a default value" do
Consumer::DEFAULT_POLLING_INTERVAL.should eql(2)
@consumer.should respond_to(:polling)
@consumer.polling.should eql(2)
end
it "should set a topic and partition on initialize" do
@consumer = Consumer.new({ :host => "localhost", :port => 9092, :topic => "testing" })
@consumer.topic.should eql("testing")
@consumer.partition.should eql(0)
@consumer = Consumer.new({ :topic => "testing", :partition => 3 })
@consumer.partition.should eql(3)
end
it "should set default host and port if none is specified" do
@consumer = Consumer.new
@consumer.host.should eql("localhost")
@consumer.port.should eql(9092)
end
it "should have a default offset, and be able to set it" do
@consumer.offset.should eql(0)
@consumer = Consumer.new({ :offset => 1111 })
@consumer.offset.should eql(1111)
end
it "should have a max size" do
Consumer::MAX_SIZE.should eql(1048576)
@consumer.max_size.should eql(1048576)
end
it "should return the size of the request" do
@consumer.request_size.should eql(24)
@consumer.topic = "someothertopicname"
@consumer.request_size.should eql(38)
@consumer.encode_request_size.should eql([@consumer.request_size].pack("N"))
end
it "should encode a request to consume" do
bytes = [Kafka::Consumer::CONSUME_REQUEST_TYPE].pack("n") + ["test".length].pack("n") + "test" + [0].pack("N") + [0].pack("L_") + [Kafka::Consumer::MAX_SIZE].pack("N")
@consumer.encode_request(Kafka::Consumer::CONSUME_REQUEST_TYPE, "test", 0, 0, Kafka::Consumer::MAX_SIZE).should eql(bytes)
end
it "should read the response data" do
bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
@mocked_socket.should_receive(:read).exactly(:twice).and_return(bytes)
@consumer.read_data_response.should eql(bytes[2, bytes.length])
end
it "should send a consumer request" do
@consumer.stub!(:encode_request_size).and_return(666)
@consumer.stub!(:encode_request).and_return("someencodedrequest")
@consumer.should_receive(:write).with("someencodedrequest").exactly(:once).and_return(true)
@consumer.should_receive(:write).with(666).exactly(:once).and_return(true)
@consumer.send_consume_request.should eql(true)
end
it "should parse a message set from bytes" do
bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
message = @consumer.parse_message_set_from(bytes).first
message.payload.should eql("ale")
message.checksum.should eql(1120192889)
message.magic.should eql(0)
message.valid?.should eql(true)
end
it "should consume messages" do
@consumer.should_receive(:send_consume_request).and_return(true)
@consumer.should_receive(:read_data_response).and_return("")
@consumer.consume.should eql([])
end
it "should loop and execute a block with the consumed messages" do
@consumer.stub!(:consume).and_return([mock(Kafka::Message)])
messages = []
messages.should_receive(:<<).exactly(:once).and_return([])
@consumer.loop do |message|
messages << message
break # we don't wanna loop forever on the test
end
end
it "should loop (every N seconds, configurable on polling attribute), and execute a block with the consumed messages" do
@consumer = Consumer.new({ :polling => 1 })
@consumer.stub!(:consume).and_return([mock(Kafka::Message)])
messages = []
messages.should_receive(:<<).exactly(:twice).and_return([])
executed_times = 0
@consumer.loop do |message|
messages << message
executed_times += 1
break if executed_times >= 2 # we don't wanna loop forever on the test, only 2 seconds
end
executed_times.should eql(2)
end
end
end