blob: 53b9471508215577cab481c284365739d38df8f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jwplayer.sqe.language.stream.kafka;
import com.google.common.base.Joiner;
import com.jwplayer.sqe.trident.spout.kafka.SqeRawFullScheme;
import org.apache.storm.kafka.FullSchemeAsMultiScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
public class KafkaStreamAdapterOptions implements Serializable {
Integer bufferSizeBytes = null;
String clientID;
Integer fetchSizeBytes = null;
Boolean filterReplays = false;
Long filterReplaysMetadataTtl = 60L * 60L * 24L * 2L;
Long maxOffsetBehind = null;
ZkHosts zkHosts;
public TridentKafkaConfig getKafkaConfig(String topologyName, String streamName, String topic) {
TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic, clientID);
if(bufferSizeBytes != null) config.bufferSizeBytes = bufferSizeBytes;
if(fetchSizeBytes != null) config.fetchSizeBytes = fetchSizeBytes;
if(maxOffsetBehind != null) config.maxOffsetBehind = maxOffsetBehind;
config.scheme = new FullSchemeAsMultiScheme(
new SqeRawFullScheme(topologyName, streamName, zkHosts)
);
return config;
}
@SuppressWarnings("unchecked")
public static KafkaStreamAdapterOptions parse(Map map) {
KafkaStreamAdapterOptions options = new KafkaStreamAdapterOptions();
options.zkHosts = new ZkHosts(Joiner.on(',').join((List<String>) map.get("jw.sqe.spout.kafka.zkhosts")));
options.clientID = (String) map.get("jw.sqe.spout.kafka.clientid");
if(map.containsKey("jw.sqe.spout.kafka.bufferSizeBytes"))
options.bufferSizeBytes = (int) map.get("jw.sqe.spout.kafka.bufferSizeBytes");
if(map.containsKey("jw.sqe.spout.kafka.fetchSizeBytes"))
options.fetchSizeBytes = (int) map.get("jw.sqe.spout.kafka.fetchSizeBytes");
if(map.containsKey("jw.sqe.spout.kafka.filterReplays"))
options.filterReplays = (boolean) map.get("jw.sqe.spout.kafka.filterReplays");
if(map.containsKey("jw.sqe.spout.kafka.filterReplays.metadata.ttl"))
options.filterReplaysMetadataTtl = (long) map.get("jw.sqe.spout.kafka.filterReplays.metadata.ttl");
if(map.containsKey("jw.sqe.spout.kafka.maxOffsetBehind"))
options.maxOffsetBehind = (long) map.get("jw.sqe.spout.kafka.maxOffsetBehind");
return options;
}
}