blob: 9f5c265d3422985209e85140a2bed399eb8195be [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.kafka;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.edgent.connectors.kafka.runtime.KafkaProducerConnector;
import org.apache.edgent.connectors.kafka.runtime.KafkaPublisher;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
/**
* {@code KafkaProducer} is a connector for publishing a stream of tuples
* to Apache Kafka messaging system topics.
* <p>
* The connector uses and includes components from the Kafka 0.8.2.2 release.
* It has been successfully tested against kafka_2.11-0.10.1.0 and kafka_2.11-0.9.0.0 server as well.
* For more information about Kafka see
* <a href="http://kafka.apache.org">http://kafka.apache.org</a>
* <p>
* Sample use:
* <pre>{@code
* String bootstrapServers = "localhost:9092";
* String topic = "mySensorReadingsTopic";
*
* Map<String,Object> config = new HashMap<>();
* config.put("bootstrap.servers", bootstrapServers);
*
* Topology t = ...
* KafkaProducer kafka = new KafkaProducer(t, () -> config);
*
* TStream<JsonObject> sensorReadings = t.poll(
* () -> getSensorReading(), 5, TimeUnit.SECONDS);
*
* // publish as sensor readings as JSON
* kafka.publish(sensonReadings, tuple -> tuple.toString(), topic);
* }</pre>
*/
public class KafkaProducer {
@SuppressWarnings("unused")
private final Topology t;
private final KafkaProducerConnector connector;
/**
* Create a producer connector for publishing tuples to Kafka topics.s
* <p>
* See the Apache Kafka documentation for {@code KafkaProducer}
* configuration properties at <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
* Configuration property values are strings.
* <p>
* The Kafka "New Producer configs" are used. Minimal configuration
* typically includes:
* <ul>
* <li><code>bootstrap.servers</code></li>
* </ul>
* <p>
* The full set of producer configuration items are specified in
* {@code org.apache.kafka.clients.producer.ProducerConfig}
*
* @param t Topology to add to
* @param config KafkaProducer configuration information.
*/
public KafkaProducer(Topology t, Supplier<Map<String,Object>> config) {
this.t = t;
connector = new KafkaProducerConnector(config);
}
/**
* Publish the stream of tuples as Kafka key/value records
* to the specified topic partitions.
* <p>
* If a valid partition number is specified that partition will be used
* when sending the message. If no partition is specified but a key is
* present a partition will be chosen using a hash of the key.
* If neither key nor partition is present a partition will be assigned
* in a round-robin fashion.
*
* @param <T> Tuple type
* @param stream the stream to publish
* @param keyFn A function that yields an optional byte[]
* Kafka record's key from the tuple.
* Specify null or return a null value for no key.
* @param valueFn A function that yields the byte[]
* Kafka record's value from the tuple.
* @param topicFn A function that yields the topic from the tuple.
* @param partitionFn A function that yields the optional topic
* partition specification from the tuple.
* Specify null or return a null value for no partition specification.
* @return {@link TSink}
*/
public <T> TSink<T> publishBytes(TStream<T> stream, Function<T,byte[]> keyFn, Function<T,byte[]> valueFn, Function<T,String> topicFn, Function<T,Integer> partitionFn) {
return stream.sink(new KafkaPublisher<T>(connector, keyFn, valueFn, topicFn, partitionFn));
}
/**
* Publish the stream of tuples as Kafka key/value records
* to the specified partitions of the specified topics.
* <p>
* This is a convenience method for {@code String} typed key/value
* conversion functions.
* <p>
* @param <T> Tuple type
* @param stream the stream to publish
* @param keyFn A function that yields an optional String
* Kafka record's key from the tuple.
* Specify null or return a null value for no key.
* @param valueFn A function that yields the String for the
* Kafka record's value from the tuple.
* @param topicFn A function that yields the topic from the tuple.
* @param partitionFn A function that yields the optional topic
* partition specification from the tuple.
* Specify null or return a null value for no partition specification.
* @return {@link TSink}
* @see #publishBytes(TStream, Function, Function, Function, Function)
*/
public <T> TSink<T> publish(TStream<T> stream, Function<T,String> keyFn, Function<T,String> valueFn, Function<T,String> topicFn, Function<T,Integer> partitionFn) {
Function<T,byte[]> keyFn2 = null;
if (keyFn != null) {
keyFn2 = tuple -> { String key = keyFn.apply(tuple);
return key==null
? null
: key.getBytes(StandardCharsets.UTF_8);
};
}
return publishBytes(stream, keyFn2,
tuple -> valueFn.apply(tuple).getBytes(StandardCharsets.UTF_8),
topicFn, partitionFn);
}
/**
* Publish the stream of tuples as Kafka key/value records
* to the specified partitions of the specified topics.
* <p>
* This is a convenience method for a String stream published
* as a Kafka record with no key and
* a value consisting of the String tuple serialized as UTF-8,
* and publishing round-robin to a fixed topic's partitions.
*
* @param stream the stream to publish
* @param topic The topic to publish to
* @return {@link TSink}
* @see #publish(TStream, Function, Function, Function, Function)
*/
public TSink<String> publish(TStream<String> stream, String topic) {
return publish(stream, null, tuple -> tuple, tuple -> topic, null);
}
}