blob: 57308a10635a878ff94bac08c8a7af1646b018e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.sql.table;
import java.util.Map;
import java.util.Properties;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
import org.apache.apex.malhar.sql.operators.OperatorUtils;
import org.apache.apex.malhar.sql.planner.RelInfo;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
/**
* This is an implementation of {@link Endpoint} which defined how data should be read/written from kafka messaging system
*
* @since 3.6.0
*/
@InterfaceStability.Evolving
public class KafkaEndpoint implements Endpoint
{
public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
public static final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
public static final String KAFKA_SERVERS = "servers";
public static final String KAFKA_TOPICS = "topics";
private MessageFormat messageFormat;
private Map<String, Object> operands;
public KafkaEndpoint()
{
}
public KafkaEndpoint(String kafkaServers, String topics, MessageFormat messageFormat)
{
this.messageFormat = messageFormat;
this.operands = ImmutableMap.<String, Object>of(KAFKA_SERVERS, kafkaServers, KAFKA_TOPICS, topics);
}
@Override
public EndpointType getTargetType()
{
return EndpointType.KAFKA;
}
@Override
public void setEndpointOperands(Map<String, Object> operands)
{
this.operands = operands;
}
@Override
public void setMessageFormat(MessageFormat messageFormat)
{
this.messageFormat = messageFormat;
}
@Override
public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
{
KafkaSinglePortInputOperator kafkaInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaInput"),
KafkaSinglePortInputOperator.class);
kafkaInput.setTopics((String)operands.get(KAFKA_TOPICS));
kafkaInput.setInitialOffset("EARLIEST");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
kafkaInput.setConsumerProps(props);
kafkaInput.setClusters((String)operands.get(KAFKA_SERVERS));
RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory);
dag.addStream(OperatorUtils.getUniqueStreamName("Kafka", "Parser"), kafkaInput.outputPort,
spec.getInputPorts().get(0));
return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(),
messageFormat.getRowType(typeFactory));
}
@Override
public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
{
RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory);
KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaOutput"),
KafkaSinglePortOutputOperator.class);
kafkaOutput.setTopic((String)operands.get(KAFKA_TOPICS));
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS));
kafkaOutput.setProperties(props);
dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "Kafka"), spec.getOutPort(), kafkaOutput.inputPort);
return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory));
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory)
{
return messageFormat.getRowType(typeFactory);
}
}