blob: d92b098e77d645ffe292fe1c39558dcc619c522b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static;
import static;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kudu.Common;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.RowResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A bounded source and sink for Kudu.
* <p>For more information, see the online documentation at <a
* href="">Kudu</a>.
* <h3>Reading from Kudu</h3>
* <p>{@code KuduIO} provides a source to read and returns a bounded collection of entities as
* {@code PCollection&lt;T&gt;}. An entity is built by parsing a Kudu {@link RowResult} using the
* provided {@link SerializableFunction}.
* <p>The following example illustrates various options for configuring the IO:
* <pre>{@code
* pipeline.apply(
* KuduIO.<String>read()
* .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
* .withTable("table")
* .withParseFn(
* (SerializableFunction<RowResult, String>) input -> input.getString(COL_NAME))
* .withCoder(StringUtf8Coder.of()));
* // above options illustrate a typical minimum set, returns PCollection<String>
* }</pre>
* <p>{@code withCoder(...)} may be omitted if it can be inferred from the @{CoderRegistry}.
* However, when using a Lambda Expression or an anonymous inner class to define the function, type
* erasure will prohibit this. In such cases you are required to explicitly set the coder as in the
* above example.
* <p>Optionally, you can provide {@code withPredicates(...)} to apply a query to filter rows from
* the kudu table.
* <p>Optionally, you can provide {@code withProjectedColumns(...)} to limit the columns returned
* from the Kudu scan to improve performance. The columns required in the {@code ParseFn} must be
* declared in the projected columns.
* <p>Optionally, you can provide {@code withBatchSize(...)} to set the number of bytes returned
* from the Kudu scanner in each batch.
* <p>Optionally, you can provide {@code withFaultTolerent(...)} to enforce the read scan to resume
* a scan on another tablet server if the current server fails.
* <h3>Writing to Kudu</h3>
* <p>The Kudu sink executes a set of operations on a single table. It takes as input a {@link
* PCollection PCollection} and a {@link FormatFunction} which is responsible for converting the
* input into an idempotent transformation on a row.
* <p>To configure a Kudu sink, you must supply the Kudu master addresses, the table name and a
* {@link FormatFunction} to convert the input records, for example:
* <pre>{@code
* PCollection<MyType> data = ...;
* FormatFunction<MyType> fn = ...;
* data.apply("write",
* KuduIO.write()
* .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
* .withTable("table")
* .withFormatFn(fn));
* }</pre>
* <h3>Experimental</h3>
* {@code KuduIO} does not support authentication in this release.
public class KuduIO {
private static final Logger LOG = LoggerFactory.getLogger(KuduIO.class);
private KuduIO() {}
public static <T> Read<T> read() {
return new AutoValue_KuduIO_Read.Builder<T>().setKuduService(new KuduServiceImpl<>()).build();
public static <T> Write<T> write() {
return new AutoValue_KuduIO_Write.Builder<T>().setKuduService(new KuduServiceImpl<>()).build();
* An interface used by the KuduIO Write to convert an input record into an Operation to apply as
* a mutation in Kudu.
public interface FormatFunction<T> extends SerializableFunction<TableAndRecord<T>, Operation> {}
/** Implementation of {@link KuduIO#read()}. */
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
abstract List<String> getMasterAddresses();
abstract String getTable();
abstract Integer getBatchSize();
abstract List<String> getProjectedColumns();
abstract List<Common.ColumnPredicatePB> getSerializablePredicates();
abstract Boolean getFaultTolerent();
abstract SerializableFunction<RowResult, T> getParseFn();
abstract Coder<T> getCoder();
abstract KuduService<T> getKuduService();
abstract Builder<T> builder();
abstract static class Builder<T> {
abstract Builder<T> setMasterAddresses(List<String> masterAddresses);
abstract Builder<T> setTable(String table);
abstract Builder<T> setBatchSize(Integer batchSize);
abstract Builder<T> setProjectedColumns(List<String> projectedColumns);
abstract Builder<T> setSerializablePredicates(
List<Common.ColumnPredicatePB> serializablePredicates);
abstract Builder<T> setFaultTolerent(Boolean faultTolerent);
abstract Builder<T> setParseFn(SerializableFunction<RowResult, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setKuduService(KuduService<T> kuduService);
abstract Read<T> build();
Coder<T> inferCoder(CoderRegistry coderRegistry) {
try {
return getCoder() != null
? getCoder()
: coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to infer coder for output of parseFn ("
+ TypeDescriptors.outputOf(getParseFn())
+ "). Specify it explicitly using withCoder().",
/** Reads from the Kudu cluster on the specified master addresses. */
public Read<T> withMasterAddresses(String masterAddresses) {
checkArgument(masterAddresses != null, "masterAddresses cannot be null or empty");
return builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();
/** Reads from the specified table. */
public Read<T> withTable(String table) {
checkArgument(table != null, "table cannot be null");
return builder().setTable(table).build();
/** Provides the function to parse a row from Kudu into the typed object. */
public Read<T> withParseFn(SerializableFunction<RowResult, T> parseFn) {
checkArgument(parseFn != null, "parseFn cannot be null");
return builder().setParseFn(parseFn).build();
/** Filters the rows read from Kudu using the given predicates. */
public Read<T> withPredicates(List<KuduPredicate> predicates) {
checkArgument(predicates != null, "predicates cannot be null");
// reuse the kudu protobuf serialization mechanism
List<Common.ColumnPredicatePB> serializablePredicates =;
return builder().setSerializablePredicates(serializablePredicates).build();
/** Filters the columns read from the table to include only those specified. */
public Read<T> withProjectedColumns(List<String> projectedColumns) {
checkArgument(projectedColumns != null, "projectedColumns cannot be null");
return builder().setProjectedColumns(projectedColumns).build();
/** Reads from the table in batches of the specified size. */
public Read<T> withBatchSize(int batchSize) {
checkArgument(batchSize >= 0, "batchSize must not be negative");
return builder().setBatchSize(batchSize).build();
* Instructs the read scan to resume a scan on another tablet server if the current server fails
* and faultTolerant is set to true.
public Read<T> withFaultTolerent(boolean faultTolerent) {
return builder().setFaultTolerent(faultTolerent).build();
* Sets a {@link Coder} for the result of the parse function. This may be required if a coder
* can not be inferred automatically.
public Read<T> withCoder(Coder<T> coder) {
checkArgument(coder != null, "coder cannot be null");
return builder().setCoder(coder).build();
/** Specify an instance of {@link KuduService} used to connect and read from Kudu. */
Read<T> withKuduService(KuduService<T> kuduService) {
checkArgument(kuduService != null, "kuduService cannot be null");
return builder().setKuduService(kuduService).build();
public PCollection<T> expand(PBegin input) {
Pipeline p = input.getPipeline();
final Coder<T> coder = inferCoder(p.getCoderRegistry());
return input.apply( KuduSource<>(this, coder, null)));
public void validate(PipelineOptions pipelineOptions) {
getMasterAddresses() != null,
" requires a list of master addresses to be set via withMasterAddresses(masterAddresses)");
getTable() != null,
" requires a table name to be set via withTableName(tableName)");
getParseFn() != null,
" requires a parse function to be set via withParseFn(parseFn)");
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("masterAddresses", getMasterAddresses().toString()));
builder.add(DisplayData.item("table", getTable()));
static class KuduSource<T> extends BoundedSource {
final Read<T> spec;
private final Coder<T> coder;
@Nullable byte[] serializedToken; // only during a split
KuduSource(Read spec, Coder<T> coder, byte[] serializedToken) {
this.spec = spec;
this.coder = coder;
this.serializedToken = serializedToken;
// A Kudu source can be split once only providing a source per tablet
public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options)
throws KuduException {
if (serializedToken != null) {
return Collections.singletonList(this); // we are already a split
} else {
Stream<BoundedSource<T>> sources =
.map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s));
return sources.collect(Collectors.toList());
public long getEstimatedSizeBytes(PipelineOptions options) {
return 0; // Kudu does not expose tablet sizes
public BoundedReader<T> createReader(PipelineOptions options) {
return spec.getKuduService().createReader(this);
public Coder<T> getOutputCoder() {
return coder;
* A {@link PTransform} that writes to Kudu. See the class-level Javadoc on {@link KuduIO} for
* more information.
* @see KuduIO
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
abstract List<String> masterAddresses();
abstract String table();
abstract FormatFunction<T> formatFn();
abstract KuduService<T> kuduService();
abstract Builder<T> builder();
abstract static class Builder<T> {
abstract Builder<T> setMasterAddresses(List<String> masterAddresses);
abstract Builder<T> setTable(String table);
abstract Builder<T> setFormatFn(FormatFunction<T> formatFn);
abstract Builder<T> setKuduService(KuduService<T> kuduService);
abstract Write<T> build();
/** Writes to the Kudu cluster on the specified master addresses. */
public Write withMasterAddresses(String masterAddresses) {
checkArgument(masterAddresses != null, "masterAddresses cannot be null or empty");
return builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();
/** Writes to the specified table. */
public Write withTable(String table) {
checkArgument(table != null, "table cannot be null");
return builder().setTable(table).build();
/** Writes using the given function to create the mutation operations from the input. */
public Write withFormatFn(FormatFunction<T> formatFn) {
checkArgument(formatFn != null, "formatFn cannot be null");
return builder().setFormatFn(formatFn).build();
/** Specify the {@link KuduService} used to connect and write into the Kudu table. */
Write<T> withKuduService(KuduService<T> kuduService) {
checkArgument(kuduService != null, "kuduService cannot be null");
return builder().setKuduService(kuduService).build();
public PDone expand(PCollection<T> input) {
input.apply(ParDo.of(new WriteFn(this)));
public void validate(PipelineOptions pipelineOptions) {
masterAddresses() != null,
"KuduIO.write() requires a list of master addresses to be set via withMasterAddresses(masterAddresses)");
table() != null, "KuduIO.write() requires a table name to be set via withTable(table)");
formatFn() != null,
"KuduIO.write() requires a format function to be set via withFormatFn(formatFn)");
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("masterAddresses", masterAddresses().toString()));
builder.add(DisplayData.item("tableName", table()));
builder.add(DisplayData.item("formatFn", formatFn().getClass().getCanonicalName()));
private class WriteFn extends DoFn<T, Void> {
private final Write<T> spec;
private KuduService.Writer writer;
WriteFn(Write<T> spec) {
this.spec = spec;
public void setup() throws KuduException {
writer = spec.kuduService().createWriter(spec);
public void startBundle(StartBundleContext context) throws KuduException {
public void processElement(ProcessContext c) throws KuduException {
public void finishBundle() throws Exception {
public void teardown() throws Exception {
writer = null;
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("masterAddresses", spec.masterAddresses().toString()));
builder.add(DisplayData.item("table", spec.table()));