blob: f2317f011305ab9f9108005915d8d351c67c6793 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package kafkaio contains cross-language functionality for using Apache Kafka
// (http://kafka.apache.org/). These transforms only work on runners that
// support cross-language transforms.
//
// Setup
//
// Transforms specified here are cross-language transforms implemented in a
// different SDK (listed below). During pipeline construction, the Go SDK will
// need to connect to an expansion service containing information on these
// transforms in their native SDK.
//
// To use an expansion service, it must be run as a separate process accessible
// during pipeline construction. The address of that process must be passed to
// the transforms in this package.
//
// The version of the expansion service should match the version of the Beam SDK
// being used. For numbered releases of Beam, these expansions services are
// released to the Maven repository as modules. For development versions of
// Beam, it is recommended to build and run it from source using Gradle.
//
// Current supported SDKs, including expansion service modules and reference
// documentation:
// * Java
// - Vendored Module: beam-sdks-java-io-expansion-service
// - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
// - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO
package kafkaio
// TODO(BEAM-12492): Implement an API for specifying Kafka type serializers and
// deserializers.
import (
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
func init() {
beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem())
}
type policy string
const (
ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
ByteArraySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
// ProcessingTime is a timestamp policy that assigns processing time to
// each record. Specifically, this is the timestamp when the record becomes
// "current" in the reader. Further documentation can be found in Java's
// KafkaIO documentation.
ProcessingTime policy = "ProcessingTime"
// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
// kafka records. Requires the records to have a type set to
// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
// documentation can be found in Java's KafkaIO documentation.
CreateTime policy = "CreateTime"
// LogAppendTime is a timestamp policy that assigns Kafka's log append time
// (server side ingestion time) to each record. Further documentation can
// be found in Java's KafkaIO documentation.
LogAppendTime policy = "LogAppendTime"
readURN = "beam:external:java:kafkaio:typedwithoutmetadata:v1"
writeURN = "beam:external:java:kafka:write:v1"
)
// Read is a cross-language PTransform which reads from Kafka and returns a
// KV pair for each item in the specified Kafka topics. By default, this runs
// as an unbounded transform and outputs keys and values as byte slices.
// These properties can be changed through optional parameters.
//
// Read requires the address for an expansion service for Kafka Read transforms,
// a comma-seperated list of bootstrap server addresses (see the Kafka property
// "bootstrap.servers" for details), and at least one topic to read from.
//
// Read also accepts optional parameters as readOptions. All optional parameters
// are predefined in this package as functions that return readOption. To set
// an optional parameter, call the function within Read's function signature.
//
// Example of Read with required and optional parameters:
//
// expansionAddr := "localhost:1234"
// bootstrapServer := "bootstrap-server:1234"
// topic := "topic_name"
// pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
// kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))
func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection {
s = s.Scope("kafkaio.Read")
if len(topics) == 0 {
panic("kafkaio.Read requires at least one topic to read from.")
}
rpl := readPayload{
ConsumerConfig: map[string]string{"bootstrap.servers": servers},
Topics: topics,
KeyDeserializer: ByteArrayDeserializer,
ValueDeserializer: ByteArrayDeserializer,
TimestampPolicy: string(ProcessingTime),
}
rcfg := readConfig{
pl: &rpl,
key: reflectx.ByteSlice,
val: reflectx.ByteSlice,
}
for _, opt := range opts {
opt(&rcfg)
}
pl := beam.CrossLanguagePayload(rpl)
outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
return out[graph.UnnamedOutputTag]
}
type readOption func(*readConfig)
type readConfig struct {
pl *readPayload
key reflect.Type
val reflect.Type
}
// ConsumerConfigs is a Read option that adds consumer properties to the
// Consumer configuration of the transform. Each usage of this adds the given
// elements to the existing map without removing existing elements.
//
// Note that the "bootstrap.servers" property is automatically set by
// kafkaio.Read and does not need to be specified via this option.
func ConsumerConfigs(cfgs map[string]string) readOption {
return func(cfg *readConfig) {
for k, v := range cfgs {
cfg.pl.ConsumerConfig[k] = v
}
}
}
// StartReadTimestamp is a Read option that specifies a start timestamp in
// milliseconds epoch, so only records after that timestamp will be read.
//
// This results in failures if one or more partitions don't contain messages
// with a timestamp larger than or equal to the one specified, or if the
// message format version in a partition is before 0.10.0, meaning messages do
// not have timestamps.
func StartReadTimestamp(ts int64) readOption {
return func(cfg *readConfig) {
cfg.pl.StartReadTime = &ts
}
}
// MaxNumRecords is a Read option that specifies the maximum amount of records
// to be read. Setting this will cause the Read to execute as a bounded
// transform. Useful for tests tests and demo applications.
func MaxNumRecords(num int64) readOption {
return func(cfg *readConfig) {
cfg.pl.MaxNumRecords = &num
}
}
// MaxReadSecs is a Read option that specifies the maximum amount of time in
// seconds the transform executes. Setting this will cause the Read to execute
// as a bounded transform. Useful for tests and demo applications.
func MaxReadSecs(secs int64) readOption {
return func(cfg *readConfig) {
cfg.pl.MaxReadTime = &secs
}
}
// CommitOffsetInFinalize is a Read option that specifies whether to commit
// offsets when finalizing.
//
// Default: false
func CommitOffsetInFinalize(enabled bool) readOption {
return func(cfg *readConfig) {
cfg.pl.CommitOffsetInFinalize = enabled
}
}
// TimestampPolicy is a Read option that specifies the timestamp policy to use
// for extracting timestamps from the KafkaRecord. Must be one of the predefined
// constant timestamp policies in this package.
//
// Default: kafkaio.ProcessingTime
func TimestampPolicy(name policy) readOption {
return func(cfg *readConfig) {
cfg.pl.TimestampPolicy = string(name)
}
}
// readPayload should produce a schema matching the expected cross-language
// payload for Kafka reads. An example of this on the receiving end can be
// found in the Java SDK class
// org.apache.beam.sdk.io.kafka.KafkaIO.Read.External.Configuration.
type readPayload struct {
ConsumerConfig map[string]string
Topics []string
KeyDeserializer string
ValueDeserializer string
StartReadTime *int64
MaxNumRecords *int64
MaxReadTime *int64
CommitOffsetInFinalize bool
TimestampPolicy string
}
// Write is a cross-language PTransform which writes KV data to a specified
// Kafka topic. By default, this assumes keys and values to be received as
// byte slices. This can be changed through optional parameters.
//
// Write requires the address for an expansion service for Kafka Write
// transforms, a comma-seperated list of bootstrap server addresses (see the
// Kafka property "bootstrap.servers" for details), and a topic to write to.
//
// Write also accepts optional parameters as writeOptions. All optional
// parameters are predefined in this package as functions that return
// writeOption. To set an optional parameter, call the function within Write's
// function signature.
//
// Example of Write with required and optional parameters:
//
// expansionAddr := "localhost:1234"
// bootstrapServer := "bootstrap-server:1234"
// topic := "topic_name"
// pcol := kafkaio.Read(s, expansionAddr, bootstrapServer, topic,
// kafkaio.ValueSerializer("foo.BarSerializer"))
func Write(s beam.Scope, addr, servers, topic string, col beam.PCollection, opts ...writeOption) {
s = s.Scope("kafkaio.Write")
wpl := writePayload{
ProducerConfig: map[string]string{"bootstrap.servers": servers},
Topic: topic,
KeySerializer: ByteArraySerializer,
ValueSerializer: ByteArraySerializer,
}
for _, opt := range opts {
opt(&wpl)
}
pl := beam.CrossLanguagePayload(wpl)
beam.CrossLanguage(s, writeURN, pl, addr, beam.UnnamedInput(col), nil)
}
type writeOption func(*writePayload)
// ProducerConfigs is a Write option that adds producer properties to the
// Producer configuration of the transform. Each usage of this adds the given
// elements to the existing map without removing existing elements.
//
// Note that the "bootstrap.servers" property is automatically set by
// kafkaio.Write and does not need to be specified via this option.
func ProducerConfigs(cfgs map[string]string) writeOption {
return func(pl *writePayload) {
for k, v := range cfgs {
pl.ProducerConfig[k] = v
}
}
}
// writePayload should produce a schema matching the expected cross-language
// payload for Kafka writes. An example of this on the receiving end can be
// found in the Java SDK class
// org.apache.beam.sdk.io.kafka.KafkaIO.Write.External.Configuration.
type writePayload struct {
ProducerConfig map[string]string
Topic string
KeySerializer string
ValueSerializer string
}