blob: acc40e1d22820756032e24d7a4dc002d50c43cc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.datastore;
import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Verify.verify;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.GqlQuery;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Key.PathElement;
import com.google.datastore.v1.Mutation;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreFactory;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.datastore.v1.client.DatastoreOptions;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
import com.google.rpc.Code;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link DatastoreV1} provides an API to Read, Write and Delete {@link PCollection PCollections} of
* <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1 {@link
* Entity} objects. Read is only supported for Bounded PCollections while Write and Delete are
* supported for both Bounded and Unbounded PCollections.
*
* <p>This API currently requires an authentication workaround. To use {@link DatastoreV1}, users
* must use the {@code gcloud} command line tool to get credentials for Cloud Datastore:
*
* <pre>
* $ gcloud auth login
* </pre>
*
* <p>To read a {@link PCollection} from a query to Cloud Datastore, use {@link DatastoreV1#read}
* and its methods {@link DatastoreV1.Read#withProjectId} and {@link DatastoreV1.Read#withQuery} to
* specify the project to query and the query to read from. You can optionally provide a namespace
* to query within using {@link DatastoreV1.Read#withNamespace}. You could also optionally specify
* how many splits you want for the query using {@link DatastoreV1.Read#withNumQuerySplits}.
*
* <p>For example:
*
* <pre>{@code
* // Read a query from Datastore
* PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
* Query query = ...;
* String projectId = "...";
*
* Pipeline p = Pipeline.create(options);
* PCollection<Entity> entities = p.apply(
* DatastoreIO.v1().read()
* .withProjectId(projectId)
* .withQuery(query));
* }</pre>
*
* <p><b>Note:</b> A runner may read from Cloud Datastore in parallel across many workers. However,
* when the {@link Query} is configured with a limit using {@link
* com.google.datastore.v1.Query.Builder#setLimit(Int32Value)} or if the Query contains inequality
* filters like {@code GREATER_THAN, LESS_THAN} etc., then all returned results will be read by a
* single worker in order to ensure correct data. Since data is read from a single worker, this
* could have a significant impact on the performance of the job.
*
* <p>To write a {@link PCollection} to a Cloud Datastore, use {@link DatastoreV1#write}, specifying
* the Cloud Datastore project to write to:
*
* <pre>{@code
* PCollection<Entity> entities = ...;
* entities.apply(DatastoreIO.v1().write().withProjectId(projectId));
* p.run();
* }</pre>
*
* <p>To delete a {@link PCollection} of {@link Entity Entities} from Cloud Datastore, use {@link
* DatastoreV1#deleteEntity()}, specifying the Cloud Datastore project to write to:
*
* <pre>{@code
* PCollection<Entity> entities = ...;
* entities.apply(DatastoreIO.v1().deleteEntity().withProjectId(projectId));
* p.run();
* }</pre>
*
* <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Cloud
* Datastore, use {@link DatastoreV1#deleteKey}, specifying the Cloud Datastore project to write to:
*
* <pre>{@code
* PCollection<Entity> entities = ...;
* entities.apply(DatastoreIO.v1().deleteKey().withProjectId(projectId));
* p.run();
* }</pre>
*
* <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
* {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the {@code
* Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than {@code projectId}
* default may be used by specifying it in the {@code Entity} {@code Keys}.
*
* <pre>{@code
* Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
* keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
* }</pre>
*
* <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
* read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
* and Keys</a> for more information about {@code Entity} keys.
*
* <h3>Permissions</h3>
*
* Permission requirements depend on the {@code PipelineRunner} that is used to execute the
* pipeline. Please refer to the documentation of corresponding {@code PipelineRunner}s for more
* details.
*
* <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
* </a>for security and permission related information specific to Cloud Datastore.
*
* <p>Optionally, Cloud Datastore V1 Emulator, running locally, could be used for testing purposes
* by providing the host port information through {@code withLocalhost("host:port"} for all the
* above transforms. In such a case, all the Cloud Datastore API calls are directed to the Emulator.
*
* @see PipelineRunner
*/
public class DatastoreV1 {
// A package-private constructor to prevent direct instantiation from outside of this package
DatastoreV1() {}
/**
* The number of entity updates written per RPC, initially. We buffer updates in the connector and
* write a batch to Datastore once we have collected a certain number. This is the initial batch
* size; it is adjusted at runtime based on the performance of previous writes (see {@link
* DatastoreV1.WriteBatcher}).
*
* <p>Testing has found that a batch of 200 entities will generally finish within the timeout even
* in adverse conditions.
*/
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200;
/**
* When choosing the number of updates in a single RPC, never exceed the maximum allowed by the
* API.
*/
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT = 500;
/**
* When choosing the number of updates in a single RPC, do not go below this value. The actual
* number of entities per request may be lower when we flush for the end of a bundle or if we hit
* {@link DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT}.
*/
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10;
/**
* Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations
* exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
* the mutations themselves and not the CommitRequest wrapper around them.
*/
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000;
/**
* Non-retryable errors. See https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes .
*/
private static final Set<Code> NON_RETRYABLE_ERRORS =
ImmutableSet.of(
Code.FAILED_PRECONDITION,
Code.INVALID_ARGUMENT,
Code.PERMISSION_DENIED,
Code.UNAUTHENTICATED);
/**
* Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
* {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using {@link
* DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery}, {@link
* DatastoreV1.Read#withNamespace}, {@link DatastoreV1.Read#withNumQuerySplits}.
*/
public DatastoreV1.Read read() {
return new AutoValue_DatastoreV1_Read.Builder().setNumQuerySplits(0).build();
}
/**
* A {@link PTransform} that reads the result rows of a Cloud Datastore query as {@code Entity}
* objects.
*
* @see DatastoreIO
*/
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<Entity>> {
private static final Logger LOG = LoggerFactory.getLogger(Read.class);
/** An upper bound on the number of splits for a query. */
public static final int NUM_QUERY_SPLITS_MAX = 50000;
/** A lower bound on the number of splits for a query. */
static final int NUM_QUERY_SPLITS_MIN = 12;
/** Default bundle size of 64MB. */
static final long DEFAULT_BUNDLE_SIZE_BYTES = 64L * 1024L * 1024L;
/**
* Maximum number of results to request per query.
*
* <p>Must be set, or it may result in an I/O error when querying Cloud Datastore.
*/
static final int QUERY_BATCH_LIMIT = 500;
@Nullable
public abstract ValueProvider<String> getProjectId();
@Nullable
public abstract Query getQuery();
@Nullable
public abstract ValueProvider<String> getLiteralGqlQuery();
@Nullable
public abstract ValueProvider<String> getNamespace();
public abstract int getNumQuerySplits();
@Nullable
public abstract String getLocalhost();
@Override
public abstract String toString();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setProjectId(ValueProvider<String> projectId);
abstract Builder setQuery(Query query);
abstract Builder setLiteralGqlQuery(ValueProvider<String> literalGqlQuery);
abstract Builder setNamespace(ValueProvider<String> namespace);
abstract Builder setNumQuerySplits(int numQuerySplits);
abstract Builder setLocalhost(String localhost);
abstract Read build();
}
/**
* Computes the number of splits to be performed on the given query by querying the estimated
* size from Cloud Datastore.
*/
static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
int numSplits;
try {
long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes);
numSplits =
(int)
Math.min(
NUM_QUERY_SPLITS_MAX,
Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
} catch (Exception e) {
LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e);
// Fallback in case estimated size is unavailable.
numSplits = NUM_QUERY_SPLITS_MIN;
}
return Math.max(numSplits, NUM_QUERY_SPLITS_MIN);
}
/**
* Cloud Datastore system tables with statistics are periodically updated. This method fetches
* the latest timestamp (in microseconds) of statistics update using the {@code __Stat_Total__}
* table.
*/
private static long queryLatestStatisticsTimestamp(
Datastore datastore, @Nullable String namespace) throws DatastoreException {
Query.Builder query = Query.newBuilder();
// Note: namespace either being null or empty represents the default namespace, in which
// case we treat it as not provided by the user.
if (Strings.isNullOrEmpty(namespace)) {
query.addKindBuilder().setName("__Stat_Total__");
} else {
query.addKindBuilder().setName("__Stat_Ns_Total__");
}
query.addOrder(makeOrder("timestamp", DESCENDING));
query.setLimit(Int32Value.newBuilder().setValue(1));
RunQueryRequest request = makeRequest(query.build(), namespace);
RunQueryResponse response = datastore.runQuery(request);
QueryResultBatch batch = response.getBatch();
if (batch.getEntityResultsCount() == 0) {
throw new NoSuchElementException("Datastore total statistics unavailable");
}
Entity entity = batch.getEntityResults(0).getEntity();
return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000;
}
/**
* Get the estimated size of the data returned by the given query.
*
* <p>Cloud Datastore provides no way to get a good estimate of how large the result of a query
* entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind is
* specified in the query.
*
* <p>See https://cloud.google.com/datastore/docs/concepts/stats.
*/
static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
throws DatastoreException {
String ourKind = query.getKind(0).getName();
long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp);
Query.Builder queryBuilder = Query.newBuilder();
if (Strings.isNullOrEmpty(namespace)) {
queryBuilder.addKindBuilder().setName("__Stat_Kind__");
} else {
queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
}
queryBuilder.setFilter(
makeAndFilter(
makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(),
makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build()));
RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
long now = System.currentTimeMillis();
RunQueryResponse response = datastore.runQuery(request);
LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
QueryResultBatch batch = response.getBatch();
if (batch.getEntityResultsCount() == 0) {
throw new NoSuchElementException(
"Datastore statistics for kind " + ourKind + " unavailable");
}
Entity entity = batch.getEntityResults(0).getEntity();
return entity.getProperties().get("entity_bytes").getIntegerValue();
}
private static PartitionId.Builder forNamespace(@Nullable String namespace) {
PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
// Namespace either being null or empty represents the default namespace.
// Datastore Client libraries expect users to not set the namespace proto field in
// either of these cases.
if (!Strings.isNullOrEmpty(namespace)) {
partitionBuilder.setNamespaceId(namespace);
}
return partitionBuilder;
}
/** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
return RunQueryRequest.newBuilder()
.setQuery(query)
.setPartitionId(forNamespace(namespace))
.build();
}
@VisibleForTesting
/** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}. */
static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) {
return RunQueryRequest.newBuilder()
.setGqlQuery(gqlQuery)
.setPartitionId(forNamespace(namespace))
.build();
}
/**
* A helper function to get the split queries, taking into account the optional {@code
* namespace}.
*/
private static List<Query> splitQuery(
Query query,
@Nullable String namespace,
Datastore datastore,
QuerySplitter querySplitter,
int numSplits)
throws DatastoreException {
// If namespace is set, include it in the split request so splits are calculated accordingly.
return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore);
}
/**
* Translates a Cloud Datastore gql query string to {@link Query}.
*
* <p>Currently, the only way to translate a gql query string to a Query is to run the query
* against Cloud Datastore and extract the {@code Query} from the response. To prevent reading
* any data, we set the {@code LIMIT} to 0 but if the gql query already has a limit set, we
* catch the exception with {@code INVALID_ARGUMENT} error code and retry the translation
* without the zero limit.
*
* <p>Note: This may result in reading actual data from Cloud Datastore but the service has a
* cap on the number of entities returned for a single rpc request, so this should not be a
* problem in practice.
*/
@VisibleForTesting
static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, String namespace)
throws DatastoreException {
String gqlQueryWithZeroLimit = gql + " LIMIT 0";
try {
Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace);
// Clear the limit that we set.
return translatedQuery.toBuilder().clearLimit().build();
} catch (DatastoreException e) {
// Note: There is no specific error code or message to detect if the query already has a
// limit, so we just check for INVALID_ARGUMENT and assume that that the query might have
// a limit already set.
if (e.getCode() == Code.INVALID_ARGUMENT) {
LOG.warn("Failed to translate Gql query '{}': {}", gqlQueryWithZeroLimit, e.getMessage());
LOG.warn("User query might have a limit already set, so trying without zero limit");
// Retry without the zero limit.
return translateGqlQuery(gql, datastore, namespace);
} else {
throw e;
}
}
}
/** Translates a gql query string to {@link Query}. */
private static Query translateGqlQuery(String gql, Datastore datastore, String namespace)
throws DatastoreException {
GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
RunQueryRequest req = makeRequest(gqlQuery, namespace);
return datastore.runQuery(req).getQuery();
}
/**
* Returns a new {@link DatastoreV1.Read} that reads from the Cloud Datastore for the specified
* project.
*/
public DatastoreV1.Read withProjectId(String projectId) {
checkArgument(projectId != null, "projectId can not be null");
return toBuilder().setProjectId(StaticValueProvider.of(projectId)).build();
}
/** Same as {@link Read#withProjectId(String)} but with a {@link ValueProvider}. */
public DatastoreV1.Read withProjectId(ValueProvider<String> projectId) {
checkArgument(projectId != null, "projectId can not be null");
return toBuilder().setProjectId(projectId).build();
}
/**
* Returns a new {@link DatastoreV1.Read} that reads the results of the specified query.
*
* <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
* across many workers. However, when the {@link Query} is configured with a limit using {@link
* Query.Builder#setLimit}, then all results will be read by a single worker in order to ensure
* correct results.
*/
public DatastoreV1.Read withQuery(Query query) {
checkArgument(query != null, "query can not be null");
checkArgument(
!query.hasLimit() || query.getLimit().getValue() > 0,
"Invalid query limit %s: must be positive",
query.getLimit().getValue());
return toBuilder().setQuery(query).build();
}
/**
* Returns a new {@link DatastoreV1.Read} that reads the results of the specified GQL query. See
* <a href="https://cloud.google.com/datastore/docs/reference/gql_reference">GQL Reference </a>
* to know more about GQL grammar.
*
* <p><b><i>Note:</i></b> This query is executed with literals allowed, so the users should
* ensure that the query is originated from trusted sources to avoid any security
* vulnerabilities via SQL Injection.
*
* <p><b><i>Experimental</i></b>: Cloud Datastore does not a provide a clean way to translate a
* gql query string to {@link Query}, so we end up making a query to the service for translation
* but this may read the actual data, although it will be a small amount. It needs more
* validation through production use cases before marking it as stable.
*/
@Experimental(Kind.SOURCE_SINK)
public DatastoreV1.Read withLiteralGqlQuery(String gqlQuery) {
checkArgument(gqlQuery != null, "gqlQuery can not be null");
return toBuilder().setLiteralGqlQuery(StaticValueProvider.of(gqlQuery)).build();
}
/** Same as {@link Read#withLiteralGqlQuery(String)} but with a {@link ValueProvider}. */
@Experimental(Kind.SOURCE_SINK)
public DatastoreV1.Read withLiteralGqlQuery(ValueProvider<String> gqlQuery) {
checkArgument(gqlQuery != null, "gqlQuery can not be null");
if (gqlQuery.isAccessible()) {
checkArgument(gqlQuery.get() != null, "gqlQuery can not be null");
}
return toBuilder().setLiteralGqlQuery(gqlQuery).build();
}
/** Returns a new {@link DatastoreV1.Read} that reads from the given namespace. */
public DatastoreV1.Read withNamespace(String namespace) {
return toBuilder().setNamespace(StaticValueProvider.of(namespace)).build();
}
/** Same as {@link Read#withNamespace(String)} but with a {@link ValueProvider}. */
public DatastoreV1.Read withNamespace(ValueProvider<String> namespace) {
return toBuilder().setNamespace(namespace).build();
}
/**
* Returns a new {@link DatastoreV1.Read} that reads by splitting the given {@code query} into
* {@code numQuerySplits}.
*
* <p>The semantics for the query splitting is defined below:
*
* <ul>
* <li>Any value less than or equal to 0 will be ignored, and the number of splits will be
* chosen dynamically at runtime based on the query data size.
* <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be capped at {@code
* NUM_QUERY_SPLITS_MAX}.
* <li>If the {@code query} has a user limit set, then {@code numQuerySplits} will be ignored
* and no split will be performed.
* <li>Under certain cases Cloud Datastore is unable to split query to the requested number of
* splits. In such cases we just use whatever the Cloud Datastore returns.
* </ul>
*/
public DatastoreV1.Read withNumQuerySplits(int numQuerySplits) {
return toBuilder()
.setNumQuerySplits(Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX))
.build();
}
/**
* Returns a new {@link DatastoreV1.Read} that reads from a Datastore Emulator running at the
* given localhost address.
*/
public DatastoreV1.Read withLocalhost(String localhost) {
return toBuilder().setLocalhost(localhost).build();
}
@Override
public PCollection<Entity> expand(PBegin input) {
checkArgument(getProjectId() != null, "projectId provider cannot be null");
if (getProjectId().isAccessible()) {
checkArgument(getProjectId().get() != null, "projectId cannot be null");
}
checkArgument(
getQuery() != null || getLiteralGqlQuery() != null,
"Either withQuery() or withLiteralGqlQuery() is required");
checkArgument(
getQuery() == null || getLiteralGqlQuery() == null,
"withQuery() and withLiteralGqlQuery() are exclusive");
V1Options v1Options = V1Options.from(getProjectId(), getNamespace(), getLocalhost());
/*
* This composite transform involves the following steps:
* 1. Create a singleton of the user provided {@code query} or if {@code gqlQuery} is
* provided apply a {@link ParDo} that translates the {@code gqlQuery} into a {@code query}.
*
* 2. A {@link ParDo} splits the resulting query into {@code numQuerySplits} and
* assign each split query a unique {@code Integer} as the key. The resulting output is
* of the type {@code PCollection<KV<Integer, Query>>}.
*
* If the value of {@code numQuerySplits} is less than or equal to 0, then the number of
* splits will be computed dynamically based on the size of the data for the {@code query}.
*
* 3. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The
* queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to
* output a {@code PCollection<Query>}.
*
* 4. In the third step, a {@code ParDo} reads entities for each query and outputs
* a {@code PCollection<Entity>}.
*/
PCollection<Query> inputQuery;
if (getQuery() != null) {
inputQuery = input.apply(Create.of(getQuery()));
} else {
inputQuery =
input
.apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
.apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
}
return inputQuery
.apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())))
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Read", ParDo.of(new ReadFn(v1Options)));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
String query = getQuery() == null ? null : getQuery().toString();
builder
.addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("ProjectId"))
.addIfNotNull(DisplayData.item("namespace", getNamespace()).withLabel("Namespace"))
.addIfNotNull(DisplayData.item("query", query).withLabel("Query"))
.addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery"));
}
@VisibleForTesting
static class V1Options implements HasDisplayData, Serializable {
private final ValueProvider<String> project;
@Nullable private final ValueProvider<String> namespace;
@Nullable private final String localhost;
private V1Options(
ValueProvider<String> project, ValueProvider<String> namespace, String localhost) {
this.project = project;
this.namespace = namespace;
this.localhost = localhost;
}
public static V1Options from(String projectId, String namespace, String localhost) {
return from(
StaticValueProvider.of(projectId), StaticValueProvider.of(namespace), localhost);
}
public static V1Options from(
ValueProvider<String> project, ValueProvider<String> namespace, String localhost) {
return new V1Options(project, namespace, localhost);
}
public String getProjectId() {
return project.get();
}
@Nullable
public String getNamespace() {
return namespace == null ? null : namespace.get();
}
public ValueProvider<String> getProjectValueProvider() {
return project;
}
@Nullable
public ValueProvider<String> getNamespaceValueProvider() {
return namespace;
}
@Nullable
public String getLocalhost() {
return localhost;
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotNull(
DisplayData.item("projectId", getProjectValueProvider()).withLabel("ProjectId"))
.addIfNotNull(
DisplayData.item("namespace", getNamespaceValueProvider()).withLabel("Namespace"));
}
}
/** A DoFn that translates a Cloud Datastore gql query string to {@code Query}. */
static class GqlQueryTranslateFn extends DoFn<String, Query> {
private final V1Options v1Options;
private transient Datastore datastore;
private final V1DatastoreFactory datastoreFactory;
GqlQueryTranslateFn(V1Options options) {
this(options, new V1DatastoreFactory());
}
GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) {
this.v1Options = options;
this.datastoreFactory = datastoreFactory;
}
@StartBundle
public void startBundle(StartBundleContext c) throws Exception {
datastore =
datastoreFactory.getDatastore(
c.getPipelineOptions(), v1Options.getProjectId(), v1Options.getLocalhost());
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String gqlQuery = c.element();
LOG.info("User query: '{}'", gqlQuery);
Query query =
translateGqlQueryWithLimitCheck(gqlQuery, datastore, v1Options.getNamespace());
LOG.info("User gql query translated to Query({})", query);
c.output(query);
}
}
/**
* A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique keys
* and outputs them as {@link KV}.
*/
@VisibleForTesting
static class SplitQueryFn extends DoFn<Query, Query> {
private final V1Options options;
// number of splits to make for a given query
private final int numSplits;
private final V1DatastoreFactory datastoreFactory;
// Datastore client
private transient Datastore datastore;
// Query splitter
private transient QuerySplitter querySplitter;
public SplitQueryFn(V1Options options, int numSplits) {
this(options, numSplits, new V1DatastoreFactory());
}
@VisibleForTesting
SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) {
this.options = options;
this.numSplits = numSplits;
this.datastoreFactory = datastoreFactory;
}
@StartBundle
public void startBundle(StartBundleContext c) throws Exception {
datastore =
datastoreFactory.getDatastore(
c.getPipelineOptions(), options.getProjectId(), options.getLocalhost());
querySplitter = datastoreFactory.getQuerySplitter();
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Query query = c.element();
// If query has a user set limit, then do not split.
if (query.hasLimit()) {
c.output(query);
return;
}
int estimatedNumSplits;
// Compute the estimated numSplits if numSplits is not specified by the user.
if (numSplits <= 0) {
estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
} else {
estimatedNumSplits = numSplits;
}
LOG.info("Splitting the query into {} splits", estimatedNumSplits);
List<Query> querySplits;
try {
querySplits =
splitQuery(
query, options.getNamespace(), datastore, querySplitter, estimatedNumSplits);
} catch (Exception e) {
LOG.warn("Unable to parallelize the given query: {}", query, e);
querySplits = ImmutableList.of(query);
}
// assign unique keys to query splits.
for (Query subquery : querySplits) {
c.output(subquery);
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.include("options", options);
if (numSplits > 0) {
builder.add(
DisplayData.item("numQuerySplits", numSplits)
.withLabel("Requested number of Query splits"));
}
}
}
/** A {@link DoFn} that reads entities from Cloud Datastore for each query. */
@VisibleForTesting
static class ReadFn extends DoFn<Query, Entity> {
private final V1Options options;
private final V1DatastoreFactory datastoreFactory;
// Datastore client
private transient Datastore datastore;
private final Counter rpcErrors =
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
private final Counter rpcSuccesses =
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
private static final int MAX_RETRIES = 5;
private static final FluentBackoff RUNQUERY_BACKOFF =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RETRIES)
.withInitialBackoff(Duration.standardSeconds(5));
public ReadFn(V1Options options) {
this(options, new V1DatastoreFactory());
}
@VisibleForTesting
ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
this.options = options;
this.datastoreFactory = datastoreFactory;
}
@StartBundle
public void startBundle(StartBundleContext c) throws Exception {
datastore =
datastoreFactory.getDatastore(
c.getPipelineOptions(), options.getProjectId(), options.getLocalhost());
}
private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = RUNQUERY_BACKOFF.backoff();
while (true) {
try {
RunQueryResponse response = datastore.runQuery(request);
rpcSuccesses.inc();
return response;
} catch (DatastoreException exception) {
rpcErrors.inc();
if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
throw exception;
}
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error("Aborting after {} retries.", MAX_RETRIES);
throw exception;
}
}
}
}
/** Read and output entities for the given query. */
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
Query query = context.element();
String namespace = options.getNamespace();
int userLimit = query.hasLimit() ? query.getLimit().getValue() : Integer.MAX_VALUE;
boolean moreResults = true;
QueryResultBatch currentBatch = null;
while (moreResults) {
Query.Builder queryBuilder = query.toBuilder();
queryBuilder.setLimit(
Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
queryBuilder.setStartCursor(currentBatch.getEndCursor());
}
RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
RunQueryResponse response = runQueryWithRetries(request);
currentBatch = response.getBatch();
// MORE_RESULTS_AFTER_LIMIT is not implemented yet:
// https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
// use result count to determine if more results might exist.
int numFetch = currentBatch.getEntityResultsCount();
if (query.hasLimit()) {
verify(
userLimit >= numFetch,
"Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit",
userLimit,
numFetch,
query.getLimit());
userLimit -= numFetch;
}
// output all the entities from the current batch.
for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
context.output(entityResult.getEntity());
}
// Check if we have more entities to be read.
moreResults =
// User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied
(userLimit > 0)
// All indications from the API are that there are/may be more results.
&& ((numFetch == QUERY_BATCH_LIMIT)
|| (currentBatch.getMoreResults() == NOT_FINISHED));
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.include("options", options);
}
}
}
/**
* Returns an empty {@link DatastoreV1.Write} builder. Configure the destination {@code projectId}
* using {@link DatastoreV1.Write#withProjectId}.
*/
public Write write() {
return new Write(null, null);
}
/**
* Returns an empty {@link DeleteEntity} builder. Configure the destination {@code projectId}
* using {@link DeleteEntity#withProjectId}.
*/
public DeleteEntity deleteEntity() {
return new DeleteEntity(null, null);
}
/**
* Returns an empty {@link DeleteKey} builder. Configure the destination {@code projectId} using
* {@link DeleteKey#withProjectId}.
*/
public DeleteKey deleteKey() {
return new DeleteKey(null, null);
}
/**
* A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
*
* @see DatastoreIO
*/
public static class Write extends Mutate<Entity> {
/**
* Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it
* is {@code null} at instantiation time, an error will be thrown.
*/
Write(@Nullable ValueProvider<String> projectId, @Nullable String localhost) {
super(projectId, localhost, new UpsertFn());
}
/** Returns a new {@link Write} that writes to the Cloud Datastore for the specified project. */
public Write withProjectId(String projectId) {
checkArgument(projectId != null, "projectId can not be null");
return withProjectId(StaticValueProvider.of(projectId));
}
/** Same as {@link Write#withProjectId(String)} but with a {@link ValueProvider}. */
public Write withProjectId(ValueProvider<String> projectId) {
checkArgument(projectId != null, "projectId can not be null");
return new Write(projectId, localhost);
}
/**
* Returns a new {@link Write} that writes to the Cloud Datastore Emulator running locally on
* the specified host port.
*/
public Write withLocalhost(String localhost) {
checkArgument(localhost != null, "localhost can not be null");
return new Write(projectId, localhost);
}
}
/**
* A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
*
* @see DatastoreIO
*/
public static class DeleteEntity extends Mutate<Entity> {
/**
* Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it
* is {@code null} at instantiation time, an error will be thrown.
*/
DeleteEntity(@Nullable ValueProvider<String> projectId, @Nullable String localhost) {
super(projectId, localhost, new DeleteEntityFn());
}
/**
* Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
* specified project.
*/
public DeleteEntity withProjectId(String projectId) {
checkArgument(projectId != null, "projectId can not be null");
return withProjectId(StaticValueProvider.of(projectId));
}
/** Same as {@link DeleteEntity#withProjectId(String)} but with a {@link ValueProvider}. */
public DeleteEntity withProjectId(ValueProvider<String> projectId) {
checkArgument(projectId != null, "projectId can not be null");
return new DeleteEntity(projectId, localhost);
}
/**
* Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore Emulator
* running locally on the specified host port.
*/
public DeleteEntity withLocalhost(String localhost) {
checkArgument(localhost != null, "localhost can not be null");
return new DeleteEntity(projectId, localhost);
}
}
/**
* A {@link PTransform} that deletes {@link Entity Entities} associated with the given {@link Key
* Keys} from Cloud Datastore.
*
* @see DatastoreIO
*/
public static class DeleteKey extends Mutate<Key> {
/**
* Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it
* is {@code null} at instantiation time, an error will be thrown.
*/
DeleteKey(@Nullable ValueProvider<String> projectId, @Nullable String localhost) {
super(projectId, localhost, new DeleteKeyFn());
}
/**
* Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
* specified project.
*/
public DeleteKey withProjectId(String projectId) {
checkArgument(projectId != null, "projectId can not be null");
return withProjectId(StaticValueProvider.of(projectId));
}
/**
* Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore Emulator
* running locally on the specified host port.
*/
public DeleteKey withLocalhost(String localhost) {
checkArgument(localhost != null, "localhost can not be null");
return new DeleteKey(projectId, localhost);
}
/** Same as {@link DeleteKey#withProjectId(String)} but with a {@link ValueProvider}. */
public DeleteKey withProjectId(ValueProvider<String> projectId) {
checkArgument(projectId != null, "projectId can not be null");
return new DeleteKey(projectId, localhost);
}
}
/**
* A {@link PTransform} that writes mutations to Cloud Datastore.
*
* <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
* {@code T} is usually either an {@link Entity} or a {@link Key} <b>Note:</b> Only idempotent
* Cloud Datastore mutation operations (upsert and delete) should be used by the {@code DoFn}
* provided, as the commits are retried when failures occur.
*/
private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
protected ValueProvider<String> projectId;
@Nullable protected String localhost;
/** A function that transforms each {@code T} into a mutation. */
private final SimpleFunction<T, Mutation> mutationFn;
/**
* Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it
* is {@code null} at instantiation time, an error will be thrown.
*/
Mutate(
@Nullable ValueProvider<String> projectId,
@Nullable String localhost,
SimpleFunction<T, Mutation> mutationFn) {
this.projectId = projectId;
this.localhost = localhost;
this.mutationFn = checkNotNull(mutationFn);
}
@Override
public PDone expand(PCollection<T> input) {
checkArgument(projectId != null, "withProjectId() is required");
if (projectId.isAccessible()) {
checkArgument(projectId.get() != null, "projectId can not be null");
}
checkArgument(mutationFn != null, "mutationFn can not be null");
input
.apply("Convert to Mutation", MapElements.via(mutationFn))
.apply(
"Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId, localhost)));
return PDone.in(input.getPipeline());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("projectId", projectId)
.add("mutationFn", mutationFn.getClass().getName())
.toString();
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull(DisplayData.item("projectId", projectId).withLabel("Output Project"))
.include("mutationFn", mutationFn);
}
public String getProjectId() {
return projectId.get();
}
}
/** Determines batch sizes for commit RPCs. */
@VisibleForTesting
interface WriteBatcher {
/** Call before using this WriteBatcher. */
void start();
/**
* Reports the latency of a previous commit RPC, and the number of mutations that it contained.
*/
void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations);
/** Returns the number of entities to include in the next CommitRequest. */
int nextBatchSize(long timeSinceEpochMillis);
}
/**
* Determines batch sizes for commit RPCs based on past performance.
*
* <p>It aims for a target response time per RPC: it uses the response times for previous RPCs and
* the number of entities contained in them, calculates a rolling average time-per-entity, and
* chooses the number of entities for future writes to hit the target time.
*
* <p>This enables us to send large batches without sending over-large requests in the case of
* expensive entity writes that may timeout before the server can apply them all.
*/
@VisibleForTesting
static class WriteBatcherImpl implements WriteBatcher, Serializable {
/** Target time per RPC for writes. */
static final int DATASTORE_BATCH_TARGET_LATENCY_MS = 5000;
@Override
public void start() {
meanLatencyPerEntityMs =
new MovingAverage(
120000 /* sample period 2 minutes */, 10000 /* sample interval 10s */,
1 /* numSignificantBuckets */, 1 /* numSignificantSamples */);
}
@Override
public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) {
meanLatencyPerEntityMs.add(timeSinceEpochMillis, latencyMillis / numMutations);
}
@Override
public int nextBatchSize(long timeSinceEpochMillis) {
if (!meanLatencyPerEntityMs.hasValue(timeSinceEpochMillis)) {
return DATASTORE_BATCH_UPDATE_ENTITIES_START;
}
long recentMeanLatency = Math.max(meanLatencyPerEntityMs.get(timeSinceEpochMillis), 1);
return (int)
Math.max(
DATASTORE_BATCH_UPDATE_ENTITIES_MIN,
Math.min(
DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT,
DATASTORE_BATCH_TARGET_LATENCY_MS / recentMeanLatency));
}
private transient MovingAverage meanLatencyPerEntityMs;
}
/**
* {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
* batches; see {@link DatastoreV1.WriteBatcherImpl}.
*
* <p>See <a href="https://cloud.google.com/datastore/docs/concepts/entities">Datastore: Entities,
* Properties, and Keys</a> for information about entity keys and mutations.
*
* <p>Commits are non-transactional. If a commit fails because of a conflict over an entity group,
* the commit will be retried (up to {@link DatastoreV1.DatastoreWriterFn#MAX_RETRIES} times).
* This means that the mutation operation should be idempotent. Thus, the writer should only be
* used for {@code upsert} and {@code delete} mutation operations, as these are the only two Cloud
* Datastore mutations that are idempotent.
*/
@VisibleForTesting
static class DatastoreWriterFn extends DoFn<Mutation, Void> {
private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
private final ValueProvider<String> projectId;
@Nullable private final String localhost;
private transient Datastore datastore;
private final V1DatastoreFactory datastoreFactory;
// Current batch of mutations to be written.
private final List<Mutation> mutations = new ArrayList<>();
private int mutationsSize = 0; // Accumulated size of protos in mutations.
private WriteBatcher writeBatcher;
private transient AdaptiveThrottler throttler;
private final Counter throttledSeconds =
Metrics.counter(DatastoreWriterFn.class, "cumulativeThrottlingSeconds");
private final Counter rpcErrors =
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
private final Counter rpcSuccesses =
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
private static final int MAX_RETRIES = 5;
private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RETRIES)
.withInitialBackoff(Duration.standardSeconds(5));
DatastoreWriterFn(String projectId, @Nullable String localhost) {
this(
StaticValueProvider.of(projectId),
localhost,
new V1DatastoreFactory(),
new WriteBatcherImpl());
}
DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost) {
this(projectId, localhost, new V1DatastoreFactory(), new WriteBatcherImpl());
}
@VisibleForTesting
DatastoreWriterFn(
ValueProvider<String> projectId,
@Nullable String localhost,
V1DatastoreFactory datastoreFactory,
WriteBatcher writeBatcher) {
this.projectId = checkNotNull(projectId, "projectId");
this.localhost = localhost;
this.datastoreFactory = datastoreFactory;
this.writeBatcher = writeBatcher;
}
@StartBundle
public void startBundle(StartBundleContext c) {
datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(), localhost);
writeBatcher.start();
if (throttler == null) {
// Initialize throttler at first use, because it is not serializable.
throttler = new AdaptiveThrottler(120000, 10000, 1.25);
}
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Mutation write = c.element();
int size = write.getSerializedSize();
if (mutations.size() > 0
&& mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) {
flushBatch();
}
mutations.add(c.element());
mutationsSize += size;
if (mutations.size() >= writeBatcher.nextBatchSize(System.currentTimeMillis())) {
flushBatch();
}
}
@FinishBundle
public void finishBundle() throws Exception {
if (!mutations.isEmpty()) {
flushBatch();
}
}
/**
* Writes a batch of mutations to Cloud Datastore.
*
* <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All mutations in
* the batch will be committed again, even if the commit was partially successful. If the retry
* limit is exceeded, the last exception from Cloud Datastore will be thrown.
*
* @throws DatastoreException if the commit fails or IOException or InterruptedException if
* backing off between retries fails.
*/
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
LOG.debug("Writing batch of {} mutations", mutations.size());
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
while (true) {
// Batch upsert entities.
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.addAllMutations(mutations);
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
long startTime = System.currentTimeMillis(), endTime;
if (throttler.throttleRequest(startTime)) {
LOG.info("Delaying request due to previous failures");
throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
continue;
}
try {
datastore.commit(commitRequest.build());
endTime = System.currentTimeMillis();
writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
throttler.successfulRequest(startTime);
rpcSuccesses.inc();
// Break if the commit threw no exception.
break;
} catch (DatastoreException exception) {
if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
/* Most errors are not related to request size, and should not change our expectation of
* the latency of successful requests. DEADLINE_EXCEEDED can be taken into
* consideration, though. */
endTime = System.currentTimeMillis();
writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
}
// Only log the code and message for potentially-transient errors. The entire exception
// will be propagated upon the last retry.
LOG.error(
"Error writing batch of {} mutations to Datastore ({}): {}",
mutations.size(),
exception.getCode(),
exception.getMessage());
rpcErrors.inc();
if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
throw exception;
}
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error("Aborting after {} retries.", MAX_RETRIES);
throw exception;
}
}
}
LOG.debug("Successfully wrote {} mutations", mutations.size());
mutations.clear();
mutationsSize = 0;
}
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(DisplayData.item("projectId", projectId).withLabel("Output Project"));
}
}
/**
* Returns true if a Cloud Datastore key is complete. A key is complete if its last element has
* either an id or a name.
*/
static boolean isValidKey(Key key) {
List<PathElement> elementList = key.getPathList();
if (elementList.isEmpty()) {
return false;
}
PathElement lastElement = elementList.get(elementList.size() - 1);
return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
}
/** A function that constructs an upsert {@link Mutation} from an {@link Entity}. */
@VisibleForTesting
static class UpsertFn extends SimpleFunction<Entity, Mutation> {
@Override
public Mutation apply(Entity entity) {
// Verify that the entity to write has a complete key.
checkArgument(
isValidKey(entity.getKey()),
"Entities to be written to the Cloud Datastore must have complete keys:\n%s",
entity);
return makeUpsert(entity).build();
}
@Override
public void populateDisplayData(Builder builder) {
builder.add(
DisplayData.item("upsertFn", this.getClass()).withLabel("Create Upsert Mutation"));
}
}
/** A function that constructs a delete {@link Mutation} from an {@link Entity}. */
@VisibleForTesting
static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
@Override
public Mutation apply(Entity entity) {
// Verify that the entity to delete has a complete key.
checkArgument(
isValidKey(entity.getKey()),
"Entities to be deleted from the Cloud Datastore must have complete keys:\n%s",
entity);
return makeDelete(entity.getKey()).build();
}
@Override
public void populateDisplayData(Builder builder) {
builder.add(
DisplayData.item("deleteEntityFn", this.getClass()).withLabel("Create Delete Mutation"));
}
}
/** A function that constructs a delete {@link Mutation} from a {@link Key}. */
@VisibleForTesting
static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
@Override
public Mutation apply(Key key) {
// Verify that the entity to delete has a complete key.
checkArgument(
isValidKey(key),
"Keys to be deleted from the Cloud Datastore must be complete:\n%s",
key);
return makeDelete(key).build();
}
@Override
public void populateDisplayData(Builder builder) {
builder.add(
DisplayData.item("deleteKeyFn", this.getClass()).withLabel("Create Delete Mutation"));
}
}
/**
* A wrapper factory class for Cloud Datastore singleton classes {@link DatastoreFactory} and
* {@link QuerySplitter}
*
* <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence wrapping
* them under this class, which implements {@link Serializable}.
*/
@VisibleForTesting
static class V1DatastoreFactory implements Serializable {
/** Builds a Cloud Datastore client for the given pipeline options and project. */
public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
return getDatastore(pipelineOptions, projectId, null);
}
/**
* Builds a Cloud Datastore client for the given pipeline options, project and an optional
* locahost.
*/
public Datastore getDatastore(
PipelineOptions pipelineOptions, String projectId, @Nullable String localhost) {
Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
HttpRequestInitializer initializer;
if (credential != null) {
initializer =
new ChainingHttpRequestInitializer(
new HttpCredentialsAdapter(credential), new RetryHttpRequestInitializer());
} else {
initializer = new RetryHttpRequestInitializer();
}
DatastoreOptions.Builder builder =
new DatastoreOptions.Builder().projectId(projectId).initializer(initializer);
if (localhost != null) {
builder.localHost(localhost);
} else {
builder.host("batch-datastore.googleapis.com");
}
return DatastoreFactory.get().create(builder.build());
}
/** Builds a Cloud Datastore {@link QuerySplitter}. */
public QuerySplitter getQuerySplitter() {
return DatastoreHelper.getQuerySplitter();
}
}
}