blob: 4c5f9b3155d4774eaef3f4e52e549575de773d77 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'logstash/namespace'
require 'logstash/outputs/base'
require 'jruby-kafka'
# Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
# the broker.
#
# The only required configuration is the topic name. The default codec is json,
# so events will be persisted on the broker in json format. If you select a codec of plain,
# Logstash will encode your messages with not only the message but also with a timestamp and
# hostname. If you do not want anything but your message passing through, you should make the output
# configuration something like:
# [source,ruby]
# output {
# kafka {
# codec => plain {
# format => "%{message}"
# }
# }
# }
# For more information see http://kafka.apache.org/documentation.html#theproducer
#
# Kafka producer configuration: http://kafka.apache.org/documentation.html#producerconfigs
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config_name 'kafka'
milestone 2
default :codec, 'json'
# This is for bootstrapping and the producer will only use it for getting metadata (topics,
# partitions and replicas). The socket connections for sending the actual data will be
# established based on the broker information returned in the metadata. The format is
# `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a
# subset of brokers.
config :broker_list, :validate => :string, :default => 'localhost:9092'
# The topic to produce the messages to
config :topic_id, :validate => :string, :required => true
# This parameter allows you to specify the compression codec for all data generated by this
# producer. Valid values are `none`, `gzip` and `snappy`.
config :compression_codec, :validate => %w( none gzip snappy ), :default => 'none'
# This parameter allows you to set whether compression should be turned on for particular
# topics. If the compression codec is anything other than `NoCompressionCodec`,
# enable compression only for specified topics if any. If the list of compressed topics is
# empty, then enable the specified compression codec for all topics. If the compression codec
# is `NoCompressionCodec`, compression is disabled for all topics
config :compressed_topics, :validate => :string, :default => ''
# This value controls when a produce request is considered completed. Specifically,
# how many other brokers must have committed the data to their log and acknowledged this to the
# leader. For more info, see -- http://kafka.apache.org/documentation.html#producerconfigs
config :request_required_acks, :validate => [-1,0,1], :default => 0
# The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]
config :serializer_class, :validate => :string, :default => 'kafka.serializer.StringEncoder'
# The partitioner class for partitioning messages amongst partitions in the topic. The default
# partitioner is based on the hash of the key. If the key is null,
# the message is sent to a random partition in the broker.
# NOTE: `topic_metadata_refresh_interval_ms` controls how long the producer will distribute to a
# partition in the topic. This defaults to 10 mins, so the producer will continue to write to a
# single partition for 10 mins before it switches
config :partitioner_class, :validate => :string, :default => 'kafka.producer.DefaultPartitioner'
# The amount of time the broker will wait trying to meet the `request.required.acks` requirement
# before sending back an error to the client.
config :request_timeout_ms, :validate => :number, :default => 10000
# This parameter specifies whether the messages are sent asynchronously in a background thread.
# Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By
# setting the producer to async we allow batching together of requests (which is great for
# throughput) but open the possibility of a failure of the client machine dropping unsent data.
config :producer_type, :validate => %w( sync async ), :default => 'sync'
# The serializer class for keys (defaults to the same as for messages if nothing is given)
config :key_serializer_class, :validate => :string, :default => 'kafka.serializer.StringEncoder'
# This property will cause the producer to automatically retry a failed send request. This
# property specifies the number of retries when such failures occur. Note that setting a
# non-zero value here can lead to duplicates in the case of network errors that cause a message
# to be sent but the acknowledgement to be lost.
config :message_send_max_retries, :validate => :number, :default => 3
# Before each retry, the producer refreshes the metadata of relevant topics to see if a new
# leader has been elected. Since leader election takes a bit of time,
# this property specifies the amount of time that the producer waits before refreshing the
# metadata.
config :retry_backoff_ms, :validate => :number, :default => 100
# The producer generally refreshes the topic metadata from brokers when there is a failure
# (partition missing, leader not available...). It will also poll regularly (default: every
# 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on
# failure. If you set this to zero, the metadata will get refreshed after each message sent
# (not recommended). Important note: the refresh happen only AFTER the message is sent,
# so if the producer never sends a message the metadata is never refreshed
config :topic_metadata_refresh_interval_ms, :validate => :number, :default => 600 * 1000
# Maximum time to buffer data when using async mode. For example a setting of 100 will try to
# batch together 100ms of messages to send at once. This will improve throughput but adds
# message delivery latency due to the buffering.
config :queue_buffering_max_ms, :validate => :number, :default => 5000
# The maximum number of unsent messages that can be queued up the producer when using async
# mode before either the producer must be blocked or data must be dropped.
config :queue_buffering_max_messages, :validate => :number, :default => 10000
# The amount of time to block before dropping messages when running in async mode and the
# buffer has reached `queue.buffering.max.messages`. If set to 0 events will be enqueued
# immediately or dropped if the queue is full (the producer send call will never block). If set
# to -1 the producer will block indefinitely and never willingly drop a send.
config :queue_enqueue_timeout_ms, :validate => :number, :default => -1
# The number of messages to send in one batch when using async mode. The producer will wait
# until either this number of messages are ready to send or `queue.buffer.max.ms` is reached.
config :batch_num_messages, :validate => :number, :default => 200
# Socket write buffer size
config :send_buffer_bytes, :validate => :number, :default => 100 * 1024
# The client id is a user-specified string sent in each request to help trace calls. It should
# logically identify the application making the request.
config :client_id, :validate => :string, :default => ''
# Provides a way to specify a partition key as a string. To specify a partition key for
# Kafka, configure a format that will produce the key as a string. Defaults
# `key_serializer_class` to `kafka.serializer.StringEncoder` to match. For example, to partition
# by host:
# [source,ruby]
# output {
# kafka {
# partition_key_format => "%{host}"
# }
# }
config :partition_key_format, :validate => :string, :default => nil
public
def register
LogStash::Logger.setup_log4j(@logger)
options = {
:broker_list => @broker_list,
:compression_codec => @compression_codec,
:compressed_topics => @compressed_topics,
:request_required_acks => @request_required_acks,
:serializer_class => @serializer_class,
:partitioner_class => @partitioner_class,
:request_timeout_ms => @request_timeout_ms,
:producer_type => @producer_type,
:key_serializer_class => @key_serializer_class,
:message_send_max_retries => @message_send_max_retries,
:retry_backoff_ms => @retry_backoff_ms,
:topic_metadata_refresh_interval_ms => @topic_metadata_refresh_interval_ms,
:queue_buffering_max_ms => @queue_buffering_max_ms,
:queue_buffering_max_messages => @queue_buffering_max_messages,
:queue_enqueue_timeout_ms => @queue_enqueue_timeout_ms,
:batch_num_messages => @batch_num_messages,
:send_buffer_bytes => @send_buffer_bytes,
:client_id => @client_id
}
@producer = Kafka::Producer.new(options)
@producer.connect
@logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list)
@codec.on_event do |data|
begin
@producer.send_msg(@current_topic_id,@partition_key,data)
rescue LogStash::ShutdownSignal
@logger.info('Kafka producer got shutdown signal')
rescue => e
@logger.warn('kafka producer threw exception, restarting',
:exception => e)
end
end
end # def register
def receive(event)
return unless output?(event)
if event == LogStash::SHUTDOWN
finished
return
end
@partition_key = if @partition_key_format.nil? then nil else event.sprintf(@partition_key_format) end
@current_topic_id = if @topic_id.nil? then nil else event.sprintf(@topic_id) end
@codec.encode(event)
@partition_key = nil
@current_topic_id = nil
end
def teardown
@producer.close
end
end #class LogStash::Outputs::Kafka