// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package kafkaio contains cross-language functionality for using Apache Kafka
// ( These transforms only work on runners that
// support cross-language transforms.
// Setup
// Transforms specified here are cross-language transforms implemented in a
// different SDK (listed below). During pipeline construction, the Go SDK will
// need to connect to an expansion service containing information on these
// transforms in their native SDK.
// To use an expansion service, it must be run as a separate process accessible
// during pipeline construction. The address of that process must be passed to
// the transforms in this package.
// The version of the expansion service should match the version of the Beam SDK
// being used. For numbered releases of Beam, these expansions services are
// released to the Maven repository as modules. For development versions of
// Beam, it is recommended to build and run it from source using Gradle.
// Current supported SDKs, including expansion service modules and reference
// documentation:
// * Java
// - Vendored Module: beam-sdks-java-io-expansion-service
// - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
// - Reference Class:
package kafkaio
// TODO(BEAM-12492): Implement an API for specifying Kafka type serializers and
// deserializers.
import (
func init() {
type policy string
const (
ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
ByteArraySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
// ProcessingTime is a timestamp policy that assigns processing time to
// each record. Specifically, this is the timestamp when the record becomes
// "current" in the reader. Further documentation can be found in Java's
// KafkaIO documentation.
ProcessingTime policy = "ProcessingTime"
// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
// kafka records. Requires the records to have a type set to
// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
// documentation can be found in Java's KafkaIO documentation.
CreateTime policy = "CreateTime"
// LogAppendTime is a timestamp policy that assigns Kafka's log append time
// (server side ingestion time) to each record. Further documentation can
// be found in Java's KafkaIO documentation.
LogAppendTime policy = "LogAppendTime"
readURN = "beam:external:java:kafkaio:typedwithoutmetadata:v1"
writeURN = "beam:external:java:kafka:write:v1"
// Read is a cross-language PTransform which reads from Kafka and returns a
// KV pair for each item in the specified Kafka topics. By default, this runs
// as an unbounded transform and outputs keys and values as byte slices.
// These properties can be changed through optional parameters.
// Read requires the address for an expansion service for Kafka Read transforms,
// a comma-seperated list of bootstrap server addresses (see the Kafka property
// "bootstrap.servers" for details), and at least one topic to read from.
// Read also accepts optional parameters as readOptions. All optional parameters
// are predefined in this package as functions that return readOption. To set
// an optional parameter, call the function within Read's function signature.
// Example of Read with required and optional parameters:
// expansionAddr := "localhost:1234"
// bootstrapServer := "bootstrap-server:1234"
// topic := "topic_name"
// pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
// kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))
func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection {
s = s.Scope("kafkaio.Read")
if len(topics) == 0 {
panic("kafkaio.Read requires at least one topic to read from.")
rpl := readPayload{
ConsumerConfig: map[string]string{"bootstrap.servers": servers},
Topics: topics,
KeyDeserializer: ByteArrayDeserializer,
ValueDeserializer: ByteArrayDeserializer,
TimestampPolicy: string(ProcessingTime),
rcfg := readConfig{
pl: &rpl,
key: reflectx.ByteSlice,
val: reflectx.ByteSlice,
for _, opt := range opts {
pl := beam.CrossLanguagePayload(rpl)
outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
return out[graph.UnnamedOutputTag]
type readOption func(*readConfig)
type readConfig struct {
pl *readPayload
key reflect.Type
val reflect.Type
// ConsumerConfigs is a Read option that adds consumer properties to the
// Consumer configuration of the transform. Each usage of this adds the given
// elements to the existing map without removing existing elements.
// Note that the "bootstrap.servers" property is automatically set by
// kafkaio.Read and does not need to be specified via this option.
func ConsumerConfigs(cfgs map[string]string) readOption {
return func(cfg *readConfig) {
for k, v := range cfgs {[k] = v
// StartReadTimestamp is a Read option that specifies a start timestamp in
// milliseconds epoch, so only records after that timestamp will be read.
// This results in failures if one or more partitions don't contain messages
// with a timestamp larger than or equal to the one specified, or if the
// message format version in a partition is before 0.10.0, meaning messages do
// not have timestamps.
func StartReadTimestamp(ts int64) readOption {
return func(cfg *readConfig) { = &ts
// MaxNumRecords is a Read option that specifies the maximum amount of records
// to be read. Setting this will cause the Read to execute as a bounded
// transform. Useful for tests tests and demo applications.
func MaxNumRecords(num int64) readOption {
return func(cfg *readConfig) { = &num
// MaxReadSecs is a Read option that specifies the maximum amount of time in
// seconds the transform executes. Setting this will cause the Read to execute
// as a bounded transform. Useful for tests and demo applications.
func MaxReadSecs(secs int64) readOption {
return func(cfg *readConfig) { = &secs
// CommitOffsetInFinalize is a Read option that specifies whether to commit
// offsets when finalizing.
// Default: false
func CommitOffsetInFinalize(enabled bool) readOption {
return func(cfg *readConfig) { = enabled
// TimestampPolicy is a Read option that specifies the timestamp policy to use
// for extracting timestamps from the KafkaRecord. Must be one of the predefined
// constant timestamp policies in this package.
// Default: kafkaio.ProcessingTime
func TimestampPolicy(name policy) readOption {
return func(cfg *readConfig) { = string(name)
// readPayload should produce a schema matching the expected cross-language
// payload for Kafka reads. An example of this on the receiving end can be
// found in the Java SDK class
type readPayload struct {
ConsumerConfig map[string]string
Topics []string
KeyDeserializer string
ValueDeserializer string
StartReadTime *int64
MaxNumRecords *int64
MaxReadTime *int64
CommitOffsetInFinalize bool
TimestampPolicy string
// Write is a cross-language PTransform which writes KV data to a specified
// Kafka topic. By default, this assumes keys and values to be received as
// byte slices. This can be changed through optional parameters.
// Write requires the address for an expansion service for Kafka Write
// transforms, a comma-seperated list of bootstrap server addresses (see the
// Kafka property "bootstrap.servers" for details), and a topic to write to.
// Write also accepts optional parameters as writeOptions. All optional
// parameters are predefined in this package as functions that return
// writeOption. To set an optional parameter, call the function within Write's
// function signature.
// Example of Write with required and optional parameters:
// expansionAddr := "localhost:1234"
// bootstrapServer := "bootstrap-server:1234"
// topic := "topic_name"
// pcol := kafkaio.Read(s, expansionAddr, bootstrapServer, topic,
// kafkaio.ValueSerializer("foo.BarSerializer"))
func Write(s beam.Scope, addr, servers, topic string, col beam.PCollection, opts ...writeOption) {
s = s.Scope("kafkaio.Write")
wpl := writePayload{
ProducerConfig: map[string]string{"bootstrap.servers": servers},
Topic: topic,
KeySerializer: ByteArraySerializer,
ValueSerializer: ByteArraySerializer,
for _, opt := range opts {
pl := beam.CrossLanguagePayload(wpl)
beam.CrossLanguage(s, writeURN, pl, addr, beam.UnnamedInput(col), nil)
type writeOption func(*writePayload)
// ProducerConfigs is a Write option that adds producer properties to the
// Producer configuration of the transform. Each usage of this adds the given
// elements to the existing map without removing existing elements.
// Note that the "bootstrap.servers" property is automatically set by
// kafkaio.Write and does not need to be specified via this option.
func ProducerConfigs(cfgs map[string]string) writeOption {
return func(pl *writePayload) {
for k, v := range cfgs {
pl.ProducerConfig[k] = v
// writePayload should produce a schema matching the expected cross-language
// payload for Kafka writes. An example of this on the receiving end can be
// found in the Java SDK class
type writePayload struct {
ProducerConfig map[string]string
Topic string
KeySerializer string
ValueSerializer string