blob: a46853cd20091e0e6c1584a1e8ce580a5669d9aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
import static org.apache.beam.sdk.io.gcp.spanner.SpannerIO.WriteGrouped.decode;
import static org.apache.beam.sdk.io.gcp.spanner.SpannerIO.WriteGrouped.encode;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.cloud.ServiceFactory;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Mutation.Op;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedBytes;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Experimental {@link PTransform Transforms} for reading from and writing to <a
* href="https://cloud.google.com/spanner">Google Cloud Spanner</a>.
*
* <h3>Reading from Cloud Spanner</h3>
*
* <p>To read from Cloud Spanner, apply {@link SpannerIO.Read} transformation. It will return a
* {@link PCollection} of {@link Struct Structs}, where each element represents an individual row
* returned from the read operation. Both Query and Read APIs are supported. See more information
* about <a href="https://cloud.google.com/spanner/docs/reads">reading from Cloud Spanner</a>
*
* <p>To execute a <strong>query</strong>, specify a {@link SpannerIO.Read#withQuery(Statement)} or
* {@link SpannerIO.Read#withQuery(String)} during the construction of the transform.
*
* <pre>{@code
* PCollection<Struct> rows = p.apply(
* SpannerIO.read()
* .withInstanceId(instanceId)
* .withDatabaseId(dbId)
* .withQuery("SELECT id, name, email FROM users"));
* }</pre>
*
* <p>To use the Read API, specify a {@link SpannerIO.Read#withTable(String) table name} and a
* {@link SpannerIO.Read#withColumns(List) list of columns}.
*
* <pre>{@code
* PCollection<Struct> rows = p.apply(
* SpannerIO.read()
* .withInstanceId(instanceId)
* .withDatabaseId(dbId)
* .withTable("users")
* .withColumns("id", "name", "email"));
* }</pre>
*
* <p>To optimally read using index, specify the index name using {@link SpannerIO.Read#withIndex}.
*
* <p>The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the
* power of read only transactions. Staleness of data can be controlled using {@link
* SpannerIO.Read#withTimestampBound} or {@link SpannerIO.Read#withTimestamp(Timestamp)} methods. <a
* href="https://cloud.google.com/spanner/docs/transactions">Read more</a> about transactions in
* Cloud Spanner.
*
* <p>It is possible to read several {@link PCollection PCollections} within a single transaction.
* Apply {@link SpannerIO#createTransaction()} transform, that lazily creates a transaction. The
* result of this transformation can be passed to read operation using {@link
* SpannerIO.Read#withTransaction(PCollectionView)}.
*
* <pre>{@code
* SpannerConfig spannerConfig = ...
*
* PCollectionView<Transaction> tx =
* p.apply(
* SpannerIO.createTransaction()
* .withSpannerConfig(spannerConfig)
* .withTimestampBound(TimestampBound.strong()));
*
* PCollection<Struct> users = p.apply(
* SpannerIO.read()
* .withSpannerConfig(spannerConfig)
* .withQuery("SELECT name, email FROM users")
* .withTransaction(tx));
*
* PCollection<Struct> tweets = p.apply(
* SpannerIO.read()
* .withSpannerConfig(spannerConfig)
* .withQuery("SELECT user, tweet, date FROM tweets")
* .withTransaction(tx));
* }</pre>
*
* <h3>Writing to Cloud Spanner</h3>
*
* <p>The Cloud Spanner {@link SpannerIO.Write} transform writes to Cloud Spanner by executing a
* collection of input row {@link Mutation Mutations}. The mutations are grouped into batches for
* efficiency.
*
* <p>To configure the write transform, create an instance using {@link #write()} and then specify
* the destination Cloud Spanner instance ({@link Write#withInstanceId(String)} and destination
* database ({@link Write#withDatabaseId(String)}). For example:
*
* <pre>{@code
* // Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner.
* PCollection<Mutation> mutations = ...;
* // Write mutations.
* SpannerWriteResult result = mutations.apply(
* "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
* }</pre>
*
* <h3>SpannerWriteResult</h3>
*
* The {@link SpannerWriteResult SpannerWriteResult} object contains the results of the transform,
* including a {@link PCollection} of MutationGroups that failed to write, and a {@link PCollection}
* that can be used in batch pipelines as a completion signal to {@link
* org.apache.beam.sdk.transforms.Wait Wait.OnSignal} to indicate when all input has been written.
* Note that in streaming pipelines, this signal will never be triggered as the input is unbounded
* and this {@link PCollection} is using the {@link GlobalWindow}.
*
* <h3>Batching</h3>
*
* <p>To reduce the number of transactions sent to Spanner, the {@link Mutation Mutations} are
* grouped into batches The default maximum size of the batch is set to 1MB or 5000 mutated cells.
* To override this use {@link Write#withBatchSizeBytes(long) withBatchSizeBytes()} and {@link
* Write#withMaxNumMutations(long) withMaxNumMutations()}. Setting either to a small value or zero
* disables batching.
*
* <p>Note that the <a
* href="https://cloud.google.com/spanner/quotas#limits_for_creating_reading_updating_and_deleting_data">maximum
* size of a single transaction</a> is 20,000 mutated cells - including cells in indexes. If you
* have a large number of indexes and are getting exceptions with message: <tt>INVALID_ARGUMENT: The
* transaction contains too many mutations</tt> you will need to specify a smaller number of {@code
* MaxNumMutations}.
*
* <p>The batches written are obtained from by grouping enough {@link Mutation Mutations} from the
* Bundle provided by Beam to form (by default) 1000 batches. This group of {@link Mutation
* Mutations} is then sorted by Key, and the batches are created from the sorted group. This so that
* each batch will have keys that are 'close' to each other to optimise write performance. This
* grouping factor (number of batches) is controlled by the parameter {@link
* Write#withGroupingFactor(int) withGroupingFactor()}.<br>
* Note that each worker will need enough memory to hold {@code GroupingFactor x MaxBatchSizeBytes}
* Mutations, so if you have a large {@code MaxBatchSize} you may need to reduce {@code
* GroupingFactor}
*
* <h3>Database Schema Preparation</h3>
*
* <p>The Write transform reads the database schema on pipeline start. If the schema is created as
* part of the same pipeline, this transform needs to wait until this has happened. Use {@link
* Write#withSchemaReadySignal(PCollection)} to pass a signal {@link PCollection} which will be used
* with {@link Wait.OnSignal} to prevent the schema from being read until it is ready. The Write
* transform will be paused until the signal {@link PCollection} is closed.
*
* <h3>Transactions</h3>
*
* <p>The transform does not provide same transactional guarantees as Cloud Spanner. In particular,
*
* <ul>
* <li>Individual Mutations are submitted atomically, but all Mutations are not submitted in the
* same transaction.
* <li>A Mutation is applied at least once;
* <li>If the pipeline was unexpectedly stopped, mutations that were already applied will not get
* rolled back.
* </ul>
*
* <p>Use {@link MutationGroup MutationGroups} with the {@link WriteGrouped} transform to ensure
* that a small set mutations is bundled together. It is guaranteed that mutations in a {@link
* MutationGroup} are submitted in the same transaction. Note that a MutationGroup must not exceed
* the Spanner transaction limits.
*
* <pre>{@code
* // Earlier in the pipeline, create a PCollection of MutationGroups to be written to Cloud Spanner.
* PCollection<MutationGroup> mutationGroups = ...;
* // Write mutation groups.
* SpannerWriteResult result = mutationGroups.apply(
* "Write",
* SpannerIO.write().withInstanceId("instance").withDatabaseId("database").grouped());
* }</pre>
*
* <h3>Streaming Support</h3>
*
* <p>{@link SpannerIO.Write} can be used as a streaming sink, however as with batch mode note that
* the write order of individual {@link Mutation}/{@link MutationGroup} objects is not guaranteed.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class SpannerIO {
private static final Logger LOG = LoggerFactory.getLogger(SpannerIO.class);
private static final long DEFAULT_BATCH_SIZE_BYTES = 1024L * 1024L; // 1 MB
// Max number of mutations to batch together.
private static final int DEFAULT_MAX_NUM_MUTATIONS = 5000;
// Multiple of mutation size to use to gather and sort mutations
private static final int DEFAULT_GROUPING_FACTOR = 1000;
/**
* Creates an uninitialized instance of {@link Read}. Before use, the {@link Read} must be
* configured with a {@link Read#withInstanceId} and {@link Read#withDatabaseId} that identify the
* Cloud Spanner database.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public static Read read() {
return new AutoValue_SpannerIO_Read.Builder()
.setSpannerConfig(SpannerConfig.create())
.setTimestampBound(TimestampBound.strong())
.setReadOperation(ReadOperation.create())
.setBatching(true)
.build();
}
/**
* A {@link PTransform} that works like {@link #read}, but executes read operations coming from a
* {@link PCollection}.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public static ReadAll readAll() {
return new AutoValue_SpannerIO_ReadAll.Builder()
.setSpannerConfig(SpannerConfig.create())
.setTimestampBound(TimestampBound.strong())
.setBatching(true)
.build();
}
/**
* Returns a transform that creates a batch transaction. By default, {@link
* TimestampBound#strong()} transaction is created, to override this use {@link
* CreateTransaction#withTimestampBound(TimestampBound)}.
*/
@Experimental
public static CreateTransaction createTransaction() {
return new AutoValue_SpannerIO_CreateTransaction.Builder()
.setSpannerConfig(SpannerConfig.create())
.setTimestampBound(TimestampBound.strong())
.build();
}
/**
* Creates an uninitialized instance of {@link Write}. Before use, the {@link Write} must be
* configured with a {@link Write#withInstanceId} and {@link Write#withDatabaseId} that identify
* the Cloud Spanner database being written.
*/
@Experimental
public static Write write() {
return new AutoValue_SpannerIO_Write.Builder()
.setSpannerConfig(SpannerConfig.create())
.setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES)
.setMaxNumMutations(DEFAULT_MAX_NUM_MUTATIONS)
.setGroupingFactor(DEFAULT_GROUPING_FACTOR)
.setFailureMode(FailureMode.FAIL_FAST)
.build();
}
/** Implementation of {@link #readAll}. */
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
public abstract static class ReadAll
extends PTransform<PCollection<ReadOperation>, PCollection<Struct>> {
abstract SpannerConfig getSpannerConfig();
@Nullable
abstract PCollectionView<Transaction> getTransaction();
@Nullable
abstract TimestampBound getTimestampBound();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
abstract Builder setTransaction(PCollectionView<Transaction> transaction);
abstract Builder setTimestampBound(TimestampBound timestampBound);
abstract Builder setBatching(Boolean batching);
abstract ReadAll build();
}
/** Specifies the Cloud Spanner configuration. */
public ReadAll withSpannerConfig(SpannerConfig spannerConfig) {
return toBuilder().setSpannerConfig(spannerConfig).build();
}
/** Specifies the Cloud Spanner project. */
public ReadAll withProjectId(String projectId) {
return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
}
/** Specifies the Cloud Spanner project. */
public ReadAll withProjectId(ValueProvider<String> projectId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withProjectId(projectId));
}
/** Specifies the Cloud Spanner instance. */
public ReadAll withInstanceId(String instanceId) {
return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
}
/** Specifies the Cloud Spanner instance. */
public ReadAll withInstanceId(ValueProvider<String> instanceId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withInstanceId(instanceId));
}
/** Specifies the Cloud Spanner database. */
public ReadAll withDatabaseId(String databaseId) {
return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
}
/** Specifies the Cloud Spanner host. */
public ReadAll withHost(ValueProvider<String> host) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withHost(host));
}
public ReadAll withHost(String host) {
return withHost(ValueProvider.StaticValueProvider.of(host));
}
/** Specifies the Cloud Spanner database. */
public ReadAll withDatabaseId(ValueProvider<String> databaseId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withDatabaseId(databaseId));
}
@VisibleForTesting
ReadAll withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withServiceFactory(serviceFactory));
}
public ReadAll withTransaction(PCollectionView<Transaction> transaction) {
return toBuilder().setTransaction(transaction).build();
}
public ReadAll withTimestamp(Timestamp timestamp) {
return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp));
}
public ReadAll withTimestampBound(TimestampBound timestampBound) {
return toBuilder().setTimestampBound(timestampBound).build();
}
/**
* By default Batch API is used to read data from Cloud Spanner. It is useful to disable
* batching when the underlying query is not root-partitionable.
*/
public ReadAll withBatching(boolean batching) {
return toBuilder().setBatching(batching).build();
}
abstract Boolean getBatching();
@Override
public PCollection<Struct> expand(PCollection<ReadOperation> input) {
PTransform<PCollection<ReadOperation>, PCollection<Struct>> readTransform;
if (getBatching()) {
readTransform =
BatchSpannerRead.create(getSpannerConfig(), getTransaction(), getTimestampBound());
} else {
readTransform =
NaiveSpannerRead.create(getSpannerConfig(), getTransaction(), getTimestampBound());
}
return input
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Read from Cloud Spanner", readTransform);
}
}
/** Implementation of {@link #read}. */
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> {
abstract SpannerConfig getSpannerConfig();
abstract ReadOperation getReadOperation();
@Nullable
abstract TimestampBound getTimestampBound();
@Nullable
abstract PCollectionView<Transaction> getTransaction();
@Nullable
abstract PartitionOptions getPartitionOptions();
abstract Boolean getBatching();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
abstract Builder setReadOperation(ReadOperation readOperation);
abstract Builder setTimestampBound(TimestampBound timestampBound);
abstract Builder setTransaction(PCollectionView<Transaction> transaction);
abstract Builder setPartitionOptions(PartitionOptions partitionOptions);
abstract Builder setBatching(Boolean batching);
abstract Read build();
}
/** Specifies the Cloud Spanner configuration. */
public Read withSpannerConfig(SpannerConfig spannerConfig) {
return toBuilder().setSpannerConfig(spannerConfig).build();
}
/** Specifies the Cloud Spanner project. */
public Read withProjectId(String projectId) {
return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
}
/** Specifies the Cloud Spanner project. */
public Read withProjectId(ValueProvider<String> projectId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withProjectId(projectId));
}
/** Specifies the Cloud Spanner instance. */
public Read withInstanceId(String instanceId) {
return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
}
/** Specifies the Cloud Spanner instance. */
public Read withInstanceId(ValueProvider<String> instanceId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withInstanceId(instanceId));
}
/** Specifies the Cloud Spanner database. */
public Read withDatabaseId(String databaseId) {
return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
}
/** Specifies the Cloud Spanner database. */
public Read withDatabaseId(ValueProvider<String> databaseId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withDatabaseId(databaseId));
}
/** Specifies the Cloud Spanner host. */
public Read withHost(ValueProvider<String> host) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withHost(host));
}
public Read withHost(String host) {
return withHost(ValueProvider.StaticValueProvider.of(host));
}
/** If true the uses Cloud Spanner batch API. */
public Read withBatching(boolean batching) {
return toBuilder().setBatching(batching).build();
}
@VisibleForTesting
Read withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withServiceFactory(serviceFactory));
}
public Read withTransaction(PCollectionView<Transaction> transaction) {
return toBuilder().setTransaction(transaction).build();
}
public Read withTimestamp(Timestamp timestamp) {
return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp));
}
public Read withTimestampBound(TimestampBound timestampBound) {
return toBuilder().setTimestampBound(timestampBound).build();
}
public Read withTable(String table) {
return withReadOperation(getReadOperation().withTable(table));
}
public Read withReadOperation(ReadOperation operation) {
return toBuilder().setReadOperation(operation).build();
}
public Read withColumns(String... columns) {
return withColumns(Arrays.asList(columns));
}
public Read withColumns(List<String> columns) {
return withReadOperation(getReadOperation().withColumns(columns));
}
public Read withQuery(Statement statement) {
return withReadOperation(getReadOperation().withQuery(statement));
}
public Read withQuery(String sql) {
return withQuery(Statement.of(sql));
}
public Read withKeySet(KeySet keySet) {
return withReadOperation(getReadOperation().withKeySet(keySet));
}
public Read withIndex(String index) {
return withReadOperation(getReadOperation().withIndex(index));
}
public Read withPartitionOptions(PartitionOptions partitionOptions) {
return withReadOperation(getReadOperation().withPartitionOptions(partitionOptions));
}
@Override
public PCollection<Struct> expand(PBegin input) {
getSpannerConfig().validate();
checkArgument(
getTimestampBound() != null,
"SpannerIO.read() runs in a read only transaction and requires timestamp to be set "
+ "with withTimestampBound or withTimestamp method");
if (getReadOperation().getQuery() != null) {
// TODO: validate query?
} else if (getReadOperation().getTable() != null) {
// Assume read
checkNotNull(
getReadOperation().getColumns(),
"For a read operation SpannerIO.read() requires a list of "
+ "columns to set with withColumns method");
checkArgument(
!getReadOperation().getColumns().isEmpty(),
"For a read operation SpannerIO.read() requires a"
+ " list of columns to set with withColumns method");
} else {
throw new IllegalArgumentException(
"SpannerIO.read() requires configuring query or read operation.");
}
ReadAll readAll =
readAll()
.withSpannerConfig(getSpannerConfig())
.withTimestampBound(getTimestampBound())
.withBatching(getBatching())
.withTransaction(getTransaction());
return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll);
}
}
/**
* A {@link PTransform} that create a transaction.
*
* @see SpannerIO
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
public abstract static class CreateTransaction
extends PTransform<PBegin, PCollectionView<Transaction>> {
abstract SpannerConfig getSpannerConfig();
@Nullable
abstract TimestampBound getTimestampBound();
abstract Builder toBuilder();
@Override
public PCollectionView<Transaction> expand(PBegin input) {
getSpannerConfig().validate();
return input
.apply(Create.of(1))
.apply("Create transaction", ParDo.of(new CreateTransactionFn(this)))
.apply("As PCollectionView", View.asSingleton());
}
/** Specifies the Cloud Spanner configuration. */
public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) {
return toBuilder().setSpannerConfig(spannerConfig).build();
}
/** Specifies the Cloud Spanner project. */
public CreateTransaction withProjectId(String projectId) {
return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
}
/** Specifies the Cloud Spanner project. */
public CreateTransaction withProjectId(ValueProvider<String> projectId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withProjectId(projectId));
}
/** Specifies the Cloud Spanner instance. */
public CreateTransaction withInstanceId(String instanceId) {
return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
}
/** Specifies the Cloud Spanner instance. */
public CreateTransaction withInstanceId(ValueProvider<String> instanceId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withInstanceId(instanceId));
}
/** Specifies the Cloud Spanner database. */
public CreateTransaction withDatabaseId(String databaseId) {
return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
}
/** Specifies the Cloud Spanner database. */
public CreateTransaction withDatabaseId(ValueProvider<String> databaseId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withDatabaseId(databaseId));
}
/** Specifies the Cloud Spanner host. */
public CreateTransaction withHost(ValueProvider<String> host) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withHost(host));
}
public CreateTransaction withHost(String host) {
return withHost(ValueProvider.StaticValueProvider.of(host));
}
@VisibleForTesting
CreateTransaction withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withServiceFactory(serviceFactory));
}
public CreateTransaction withTimestampBound(TimestampBound timestampBound) {
return toBuilder().setTimestampBound(timestampBound).build();
}
/** A builder for {@link CreateTransaction}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
public abstract Builder setTimestampBound(TimestampBound newTimestampBound);
public abstract CreateTransaction build();
}
}
/** A failure handling strategy. */
public enum FailureMode {
/** Invalid write to Spanner will cause the pipeline to fail. A default strategy. */
FAIL_FAST,
/** Invalid mutations will be returned as part of the result of the write transform. */
REPORT_FAILURES
}
/**
* A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner.
*
* @see SpannerIO
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
public abstract static class Write extends PTransform<PCollection<Mutation>, SpannerWriteResult> {
abstract SpannerConfig getSpannerConfig();
abstract long getBatchSizeBytes();
abstract long getMaxNumMutations();
abstract FailureMode getFailureMode();
@Nullable
abstract PCollection getSchemaReadySignal();
abstract int getGroupingFactor();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
abstract Builder setBatchSizeBytes(long batchSizeBytes);
abstract Builder setMaxNumMutations(long maxNumMutations);
abstract Builder setFailureMode(FailureMode failureMode);
abstract Builder setSchemaReadySignal(PCollection schemaReadySignal);
abstract Builder setGroupingFactor(int groupingFactor);
abstract Write build();
}
/** Specifies the Cloud Spanner configuration. */
public Write withSpannerConfig(SpannerConfig spannerConfig) {
return toBuilder().setSpannerConfig(spannerConfig).build();
}
/** Specifies the Cloud Spanner project. */
public Write withProjectId(String projectId) {
return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
}
/** Specifies the Cloud Spanner project. */
public Write withProjectId(ValueProvider<String> projectId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withProjectId(projectId));
}
/** Specifies the Cloud Spanner instance. */
public Write withInstanceId(String instanceId) {
return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
}
/** Specifies the Cloud Spanner instance. */
public Write withInstanceId(ValueProvider<String> instanceId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withInstanceId(instanceId));
}
/** Specifies the Cloud Spanner database. */
public Write withDatabaseId(String databaseId) {
return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
}
/** Specifies the Cloud Spanner database. */
public Write withDatabaseId(ValueProvider<String> databaseId) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withDatabaseId(databaseId));
}
/** Specifies the Cloud Spanner host. */
public Write withHost(ValueProvider<String> host) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withHost(host));
}
/** Specifies the Cloud Spanner host. */
public Write withHost(String host) {
return withHost(ValueProvider.StaticValueProvider.of(host));
}
@VisibleForTesting
Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withServiceFactory(serviceFactory));
}
/** Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. */
public WriteGrouped grouped() {
return new WriteGrouped(this);
}
/**
* Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1MB
*/
public Write withBatchSizeBytes(long batchSizeBytes) {
return toBuilder().setBatchSizeBytes(batchSizeBytes).build();
}
/** Specifies failure mode. {@link FailureMode#FAIL_FAST} mode is selected by default. */
public Write withFailureMode(FailureMode failureMode) {
return toBuilder().setFailureMode(failureMode).build();
}
/**
* Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value
* is 5000
*/
public Write withMaxNumMutations(long maxNumMutations) {
return toBuilder().setMaxNumMutations(maxNumMutations).build();
}
/**
* Specifies an optional input PCollection that can be used as the signal for {@link
* Wait.OnSignal} to indicate when the database schema is ready to be read.
*
* <p>To be used when the database schema is created by another section of the pipeline, this
* causes this transform to wait until the {@code signal PCollection} has been closed before
* reading the schema from the database.
*
* @see Wait.OnSignal
*/
public Write withSchemaReadySignal(PCollection signal) {
return toBuilder().setSchemaReadySignal(signal).build();
}
/**
* Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch)
* that is used to select a set of mutations to sort by key for batching. This sort uses local
* memory on the workers, so using large values can cause out of memory errors. Default value is
* 1000.
*/
public Write withGroupingFactor(int groupingFactor) {
return toBuilder().setGroupingFactor(groupingFactor).build();
}
@Override
public SpannerWriteResult expand(PCollection<Mutation> input) {
getSpannerConfig().validate();
return input
.apply("To mutation group", ParDo.of(new ToMutationGroupFn()))
.apply("Write mutations to Cloud Spanner", new WriteGrouped(this));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
getSpannerConfig().populateDisplayData(builder);
builder.add(
DisplayData.item("batchSizeBytes", getBatchSizeBytes()).withLabel("Batch Size in Bytes"));
}
}
/**
* A singleton to compare encoded MutationGroups by encoded Key that wraps {@code
* UnsignedBytes#lexicographicalComparator} which unfortunately is not serializable.
*/
private enum EncodedKvMutationGroupComparator
implements Comparator<KV<byte[], byte[]>>, Serializable {
INSTANCE {
@Override
public int compare(KV<byte[], byte[]> a, KV<byte[], byte[]> b) {
return UnsignedBytes.lexicographicalComparator().compare(a.getKey(), b.getKey());
}
}
}
/** Same as {@link Write} but supports grouped mutations. */
public static class WriteGrouped
extends PTransform<PCollection<MutationGroup>, SpannerWriteResult> {
private final Write spec;
private static final TupleTag<MutationGroup> BATCHABLE_MUTATIONS_TAG =
new TupleTag<MutationGroup>("batchableMutations") {};
private static final TupleTag<Iterable<MutationGroup>> UNBATCHABLE_MUTATIONS_TAG =
new TupleTag<Iterable<MutationGroup>>("unbatchableMutations") {};
private static final TupleTag<Void> MAIN_OUT_TAG = new TupleTag<Void>("mainOut") {};
private static final TupleTag<MutationGroup> FAILED_MUTATIONS_TAG =
new TupleTag<MutationGroup>("failedMutations") {};
private static final SerializableCoder<MutationGroup> CODER =
SerializableCoder.of(MutationGroup.class);
public WriteGrouped(Write spec) {
this.spec = spec;
}
@Override
public SpannerWriteResult expand(PCollection<MutationGroup> input) {
// First, read the Cloud Spanner schema.
PCollection<Void> schemaSeed =
input.getPipeline().apply("Create Seed", Create.of((Void) null));
if (spec.getSchemaReadySignal() != null) {
// Wait for external signal before reading schema.
schemaSeed = schemaSeed.apply("Wait for schema", Wait.on(spec.getSchemaReadySignal()));
}
final PCollectionView<SpannerSchema> schemaView =
schemaSeed
.apply(
"Read information schema",
ParDo.of(new ReadSpannerSchema(spec.getSpannerConfig())))
.apply("Schema View", View.asSingleton());
// Split the mutations into batchable and unbatchable mutations.
// Filter out mutation groups too big to be batched.
PCollectionTuple filteredMutations =
input
.apply("To Global Window", Window.into(new GlobalWindows()))
.apply(
"Filter Unbatchable Mutations",
ParDo.of(
new BatchableMutationFilterFn(
schemaView,
UNBATCHABLE_MUTATIONS_TAG,
spec.getBatchSizeBytes(),
spec.getMaxNumMutations()))
.withSideInputs(schemaView)
.withOutputTags(
BATCHABLE_MUTATIONS_TAG, TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
// Build a set of Mutation groups from the current bundle,
// sort them by table/key then split into batches.
PCollection<Iterable<MutationGroup>> batchedMutations =
filteredMutations
.get(BATCHABLE_MUTATIONS_TAG)
.apply(
"Gather And Sort",
ParDo.of(
new GatherBundleAndSortFn(
spec.getBatchSizeBytes(),
spec.getMaxNumMutations(),
spec.getGroupingFactor(),
schemaView))
.withSideInputs(schemaView))
.apply(
"Create Batches",
ParDo.of(
new BatchFn(
spec.getBatchSizeBytes(), spec.getMaxNumMutations(), schemaView))
.withSideInputs(schemaView));
// Merge the batchable and unbatchable mutations and write to Spanner.
PCollectionTuple result =
PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG))
.and(batchedMutations)
.apply("Merge", Flatten.pCollections())
.apply(
"Write mutations to Spanner",
ParDo.of(
new WriteToSpannerFn(
spec.getSpannerConfig(), spec.getFailureMode(), FAILED_MUTATIONS_TAG))
.withOutputTags(MAIN_OUT_TAG, TupleTagList.of(FAILED_MUTATIONS_TAG)));
return new SpannerWriteResult(
input.getPipeline(),
result.get(MAIN_OUT_TAG),
result.get(FAILED_MUTATIONS_TAG),
FAILED_MUTATIONS_TAG);
}
@VisibleForTesting
static MutationGroup decode(byte[] bytes) {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
try {
return CODER.decode(bis);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@VisibleForTesting
static byte[] encode(MutationGroup g) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
CODER.encode(g, bos);
} catch (IOException e) {
throw new RuntimeException(e);
}
return bos.toByteArray();
}
}
private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> {
@ProcessElement
public void processElement(ProcessContext c) {
Mutation value = c.element();
c.output(MutationGroup.create(value));
}
}
/**
* Gathers a set of mutations together, gets the keys, encodes them to byte[], sorts them and then
* outputs the encoded sorted list.
*
* <p>Testing notes: With very small amounts of data, each mutation group is in a separate bundle,
* and as batching and sorting is over the bundle, this effectively means that no batching will
* occur, Therefore this DoFn has to be tested in isolation.
*/
@VisibleForTesting
static class GatherBundleAndSortFn extends DoFn<MutationGroup, Iterable<KV<byte[], byte[]>>> {
private final long maxBatchSizeBytes;
private final long maxNumMutations;
// total size of the current batch.
private long batchSizeBytes;
// total number of mutated cells including indices.
private long batchCells;
private final PCollectionView<SpannerSchema> schemaView;
private transient ArrayList<KV<byte[], byte[]>> mutationsToSort = null;
GatherBundleAndSortFn(
long maxBatchSizeBytes,
long maxNumMutations,
long groupingFactor,
PCollectionView<SpannerSchema> schemaView) {
this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor;
this.maxNumMutations = maxNumMutations * groupingFactor;
this.schemaView = schemaView;
}
@StartBundle
public synchronized void startBundle() throws Exception {
if (mutationsToSort == null) {
initSorter();
} else {
throw new IllegalStateException("Sorter should be null here");
}
}
private void initSorter() {
mutationsToSort = new ArrayList<KV<byte[], byte[]>>((int) maxNumMutations);
batchSizeBytes = 0;
batchCells = 0;
}
@FinishBundle
public synchronized void finishBundle(FinishBundleContext c) throws Exception {
c.output(sortAndGetList(), Instant.now(), GlobalWindow.INSTANCE);
}
private Iterable<KV<byte[], byte[]>> sortAndGetList() throws IOException {
mutationsToSort.sort(EncodedKvMutationGroupComparator.INSTANCE);
ArrayList<KV<byte[], byte[]>> tmp = mutationsToSort;
// Ensure no more mutations can be added.
mutationsToSort = null;
return tmp;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
SpannerSchema spannerSchema = c.sideInput(schemaView);
MutationKeyEncoder encoder = new MutationKeyEncoder(spannerSchema);
MutationGroup mg = c.element();
long groupSize = MutationSizeEstimator.sizeOf(mg);
long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
synchronized (this) {
if (((batchCells + groupCells) > maxNumMutations)
|| (batchSizeBytes + groupSize) > maxBatchSizeBytes) {
c.output(sortAndGetList());
initSorter();
}
mutationsToSort.add(KV.of(encoder.encodeTableNameAndKey(mg.primary()), encode(mg)));
batchSizeBytes += groupSize;
batchCells += groupCells;
}
}
}
/** Batches mutations together. */
@VisibleForTesting
static class BatchFn extends DoFn<Iterable<KV<byte[], byte[]>>, Iterable<MutationGroup>> {
private final long maxBatchSizeBytes;
private final long maxNumMutations;
private final PCollectionView<SpannerSchema> schemaView;
BatchFn(
long maxBatchSizeBytes, long maxNumMutations, PCollectionView<SpannerSchema> schemaView) {
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.maxNumMutations = maxNumMutations;
this.schemaView = schemaView;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
SpannerSchema spannerSchema = c.sideInput(schemaView);
// Current batch of mutations to be written.
ImmutableList.Builder<MutationGroup> batch = ImmutableList.builder();
// total size of the current batch.
long batchSizeBytes = 0;
// total number of mutated cells including indices.
long batchCells = 0;
// Iterate through list, outputting whenever a batch is complete.
for (KV<byte[], byte[]> kv : c.element()) {
MutationGroup mg = decode(kv.getValue());
long groupSize = MutationSizeEstimator.sizeOf(mg);
long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
if (((batchCells + groupCells) > maxNumMutations)
|| ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) {
// Batch is full: output and reset.
c.output(batch.build());
batch = ImmutableList.builder();
batchSizeBytes = 0;
batchCells = 0;
}
batch.add(mg);
batchSizeBytes += groupSize;
batchCells += groupCells;
}
// End of list, output what is left.
if (batchCells > 0) {
c.output(batch.build());
}
}
}
/**
* Filters MutationGroups larger than the batch size to the output tagged with {@code
* UNBATCHABLE_MUTATIONS_TAG}.
*
* <p>Testing notes: As batching does not occur during full pipline testing, this DoFn must be
* tested in isolation.
*/
@VisibleForTesting
static class BatchableMutationFilterFn extends DoFn<MutationGroup, MutationGroup> {
private final PCollectionView<SpannerSchema> schemaView;
private final TupleTag<Iterable<MutationGroup>> unbatchableMutationsTag;
private final long batchSizeBytes;
private final long maxNumMutations;
private final Counter batchableMutationGroupsCounter =
Metrics.counter(WriteGrouped.class, "batchable_mutation_groups");
private final Counter unBatchableMutationGroupsCounter =
Metrics.counter(WriteGrouped.class, "unbatchable_mutation_groups");
BatchableMutationFilterFn(
PCollectionView<SpannerSchema> schemaView,
TupleTag<Iterable<MutationGroup>> unbatchableMutationsTag,
long batchSizeBytes,
long maxNumMutations) {
this.schemaView = schemaView;
this.unbatchableMutationsTag = unbatchableMutationsTag;
this.batchSizeBytes = batchSizeBytes;
this.maxNumMutations = maxNumMutations;
}
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
MutationGroup mg = c.element();
if (mg.primary().getOperation() == Op.DELETE && !isPointDelete(mg.primary())) {
// Ranged deletes are not batchable.
c.output(unbatchableMutationsTag, Arrays.asList(mg));
unBatchableMutationGroupsCounter.inc();
return;
}
SpannerSchema spannerSchema = c.sideInput(schemaView);
long groupSize = MutationSizeEstimator.sizeOf(mg);
long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
if (groupSize >= batchSizeBytes || groupCells >= maxNumMutations) {
c.output(unbatchableMutationsTag, Arrays.asList(mg));
unBatchableMutationGroupsCounter.inc();
} else {
c.output(mg);
batchableMutationGroupsCounter.inc();
}
}
}
private static class WriteToSpannerFn extends DoFn<Iterable<MutationGroup>, Void> {
private transient SpannerAccessor spannerAccessor;
private final SpannerConfig spannerConfig;
private final FailureMode failureMode;
private final Counter mutationGroupBatchesCounter =
Metrics.counter(WriteGrouped.class, "mutation_group_batches");
private final Counter mutationGroupWriteSuccessCounter =
Metrics.counter(WriteGrouped.class, "mutation_groups_write_success");
private final Counter mutationGroupWriteFailCounter =
Metrics.counter(WriteGrouped.class, "mutation_groups_write_fail");
private final TupleTag<MutationGroup> failedTag;
WriteToSpannerFn(
SpannerConfig spannerConfig, FailureMode failureMode, TupleTag<MutationGroup> failedTag) {
this.spannerConfig = spannerConfig;
this.failureMode = failureMode;
this.failedTag = failedTag;
}
@Setup
public void setup() throws Exception {
spannerAccessor = spannerConfig.connectToSpanner();
}
@Teardown
public void teardown() throws Exception {
spannerAccessor.close();
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Iterable<MutationGroup> mutations = c.element();
boolean tryIndividual = false;
// Batch upsert rows.
try {
mutationGroupBatchesCounter.inc();
Iterable<Mutation> batch = Iterables.concat(mutations);
spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
mutationGroupWriteSuccessCounter.inc(Iterables.size(mutations));
return;
} catch (SpannerException e) {
if (failureMode == FailureMode.REPORT_FAILURES) {
tryIndividual = true;
} else if (failureMode == FailureMode.FAIL_FAST) {
throw e;
} else {
throw new IllegalArgumentException("Unknown failure mode " + failureMode);
}
}
if (tryIndividual) {
for (MutationGroup mg : mutations) {
try {
spannerAccessor.getDatabaseClient().writeAtLeastOnce(mg);
mutationGroupWriteSuccessCounter.inc();
} catch (SpannerException e) {
mutationGroupWriteFailCounter.inc();
LOG.warn("Failed to write the mutation group: " + mg, e);
c.output(failedTag, mg);
}
}
}
}
}
private SpannerIO() {} // Prevent construction.
}