blob: b6ee4d0ee72147f2a8739d7af391a5e8fa106db1 [file] [log] [blame]
/*
* Copyright 2016 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.heron.spouts.kafka;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import backtype.storm.spout.MultiScheme;
import backtype.storm.spout.RawMultiScheme;
public class SpoutConfig implements Serializable {
private static final long DEFAULT_TUPLE_IN_THE_PAST_THRESHOLD = 30000; // 30s
public static final String TOPOLOGY_STORE_UPDATE_MSEC = "topology.store.update.msec";
public static final String START_OFFSET_DELTA = "spout.offset.delta";
/**
* Array of filter operator's config like classname e.g
* com.twitter.heron.spouts.kafka.LagAvoidanceFiler.
* Name of spout, e.g spout0 Parameter e.g {}
*/
public static final String TOPOLOGY_FILTER_CONFIG = "topology.filter.config";
public static final String SPOUT_SHOULD_REPLAY_FAILED_OFFSETS =
"spout.should.replay.failed.offsets";
public static final String SPOUT_MAX_FAILED_OFFSETS = "spout.max.failed.offsets";
public String topic; // Required
public KafkaConfig.BrokerHosts hosts; // Required.
public List<String> zkServers = new ArrayList<String>();
public int zkPort = 2181;
public String zkRoot = "/kafkaconsumer";
public String id = null;
public int storeUpdateMsec = 2 * 1000;
public int zookeeperStoreUpdateMsec = -1; // We do not write offsets to zk anymore
public int zookeeperStoreSessionTimeout = 40000;
public FilterOperator filterOperator;
public boolean shouldAnchorEmits = true;
public boolean shouldReplay = true;
public int zookeeperRetryCount = 5;
public int zookeeperRetryInterval = 60 * 1000;
public TimestampExtractor timestampExtractor = null;
public int emitQueueMaxSize = 1000;
// If a timestamp extractor is specified, we extract the timestamps associated with all tuples we
// read from Kafka partitions, and we keep track of the latest timestamp. If the timestamp
// associated with a tuple is too far in the past compared to the latest timestamp, we increment
// the tuplesInThePast counter. This constant determines how far in the past the tuple's timestamp
// should be in order for the counter to be incremented. The default is 30 seconds.
public long tupleInThePastThreshold = DEFAULT_TUPLE_IN_THE_PAST_THRESHOLD;
public int fetchSizeBytes = 10 * 1024 * 1024; // 10MB
public MultiScheme scheme = new RawMultiScheme();
public long startOffsetTime = -1; // Default set to start from latest.
public boolean forceFromStart = false;
public int socketTimeoutMs = 1 * 60 * 1000; // 1 min
public int bufferSizeBytes = 10 * 1024 * 1024; // 10 MB buffer
public long maxFailedTupleCount = 10; // Max failed tuple at a time. Anymore will be dropped.
/*
* Read the kafka queue with a fixed lag (in bytes).
*
* For example, if it is set to be 1048576 (1M). The kafka reader will
* always keep a fixed lag 1M with the head of that kafka partition. If the queue
* has 100 partitions, the storm job will keep a 100M lag with the head.
*
* Note that this job would slow down the storm topology a little bit. Please add some
* workers if you would like to use this feature.
*
* By default, it is set to be -1. So this feature is disabled.
*/
public long fixedLagBytes = -1;
/**
* Ctor. Create a config to be passed to kafka spout containing broker information and id.
*/
public SpoutConfig(KafkaConfig.BrokerHosts hosts, String topic, String zkRoot, String id) {
this.hosts = hosts;
this.topic = topic;
if (zkRoot != null) {
this.zkRoot = zkRoot;
}
this.id = id;
}
/**
* Configures the spout to start reading Kafka partitions at the offset that stores the events
* added at time {@code earliestEventTimestamp}.
*
* There are two special value for the {@code earliestEventTimestamp}:
* -1: will force the spout to start reading from the latest offset.
* -2: will force the spout to start reading from the earliest offset.
*
* @param earliestEventTimestamp The timestamp (in milliseconds) when the earliest event was added
* to the Kafka topic.
*/
public void forceStartOffsetTime(long earliestEventTimestamp) {
startOffsetTime = earliestEventTimestamp;
forceFromStart = true;
}
/** Create kafka cnxns using zookeeper for service discovery. */
public static class ZkHosts implements KafkaConfig.BrokerHosts {
public String brokerZkStr = null;
public String brokerZkPath = null; // e.g., /kafka/brokers
public int refreshFreqMSecs = 120 * 1000;
public ZkHosts(String brokerZkStr, String brokerZkPath) {
this.brokerZkStr = brokerZkStr;
this.brokerZkPath = brokerZkPath;
}
}
}