blob: 20b0564e50152de64dc2180c5053b791ec4211ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.datastore;
import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.makeRequest;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.translateGqlQueryWithLimitCheck;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.isValidKey;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.CommitResponse;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.GqlQuery;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Mutation;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
import com.google.rpc.Code;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntityFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKey;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKeyFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.ReadFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.UpsertFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.V1DatastoreFactory;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link DatastoreV1}. */
@RunWith(JUnit4.class)
public class DatastoreV1Test {
private static final String PROJECT_ID = "testProject";
private static final String NAMESPACE = "testNamespace";
private static final String KIND = "testKind";
private static final Query QUERY;
private static final String LOCALHOST = "localhost:9955";
private static final String GQL_QUERY = "SELECT * from " + KIND;
private static final V1Options V_1_OPTIONS;
static {
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(KIND);
QUERY = q.build();
V_1_OPTIONS = V1Options.from(PROJECT_ID, NAMESPACE, null);
}
@Mock private Datastore mockDatastore;
@Mock QuerySplitter mockQuerySplitter;
@Mock V1DatastoreFactory mockDatastoreFactory;
@Rule public final ExpectedException thrown = ExpectedException.none();
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
DatastoreV1.Read initialRead =
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
when(mockDatastoreFactory.getDatastore(
any(PipelineOptions.class), any(String.class), any(String.class)))
.thenReturn(mockDatastore);
when(mockDatastoreFactory.getQuerySplitter()).thenReturn(mockQuerySplitter);
}
@Test
public void testBuildRead() throws Exception {
DatastoreV1.Read read =
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
assertEquals(QUERY, read.getQuery());
assertEquals(PROJECT_ID, read.getProjectId().get());
assertEquals(NAMESPACE, read.getNamespace().get());
}
@Test
public void testBuildReadWithGqlQuery() throws Exception {
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
.withProjectId(PROJECT_ID)
.withLiteralGqlQuery(GQL_QUERY)
.withNamespace(NAMESPACE);
assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
assertEquals(PROJECT_ID, read.getProjectId().get());
assertEquals(NAMESPACE, read.getNamespace().get());
}
/** {@link #testBuildRead} but constructed in a different order. */
@Test
public void testBuildReadAlt() throws Exception {
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
.withQuery(QUERY)
.withNamespace(NAMESPACE)
.withProjectId(PROJECT_ID)
.withLocalhost(LOCALHOST);
assertEquals(QUERY, read.getQuery());
assertEquals(PROJECT_ID, read.getProjectId().get());
assertEquals(NAMESPACE, read.getNamespace().get());
assertEquals(LOCALHOST, read.getLocalhost());
}
@Test
public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
.withProjectId(PROJECT_ID)
.withLiteralGqlQuery(GQL_QUERY)
.withQuery(QUERY);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
read.expand(null);
}
@Test
public void testReadValidationFailsQueryLimitZero() throws Exception {
Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid query limit 0: must be positive");
DatastoreIO.v1().read().withQuery(invalidLimit);
}
@Test
public void testReadValidationFailsQueryLimitNegative() throws Exception {
Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid query limit -5: must be positive");
DatastoreIO.v1().read().withQuery(invalidLimit);
}
@Test
public void testReadDisplayData() {
DatastoreV1.Read read =
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
}
@Test
public void testReadDisplayDataWithGqlQuery() {
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
.withProjectId(PROJECT_ID)
.withLiteralGqlQuery(GQL_QUERY)
.withNamespace(NAMESPACE);
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY));
assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
}
@Test
public void testSourcePrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
int numSplits = 98;
PTransform<PBegin, PCollection<Entity>> read =
DatastoreIO.v1()
.read()
.withProjectId(PROJECT_ID)
.withQuery(Query.newBuilder().build())
.withNumQuerySplits(numSplits);
String assertMessage = "DatastoreIO read should include the '%s' in its primitive display data";
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat(
String.format(assertMessage, "project id"),
displayData,
hasItem(hasDisplayItem("projectId", PROJECT_ID)));
assertThat(
String.format(assertMessage, "number of query splits"),
displayData,
hasItem(hasDisplayItem("numQuerySplits", numSplits)));
}
@Test
public void testWriteDisplayData() {
Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
}
@Test
public void testDeleteEntityDisplayData() {
DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
DisplayData displayData = DisplayData.from(deleteEntity);
assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
}
@Test
public void testDeleteKeyDisplayData() {
DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
DisplayData displayData = DisplayData.from(deleteKey);
assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
}
@Test
public void testWritePrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
PTransform<PCollection<Entity>, ?> write = DatastoreIO.v1().write().withProjectId("myProject");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat(
"DatastoreIO write should include the project in its primitive display data",
displayData,
hasItem(hasDisplayItem("projectId")));
assertThat(
"DatastoreIO write should include the upsertFn in its primitive display data",
displayData,
hasItem(hasDisplayItem("upsertFn")));
}
@Test
public void testDeleteEntityPrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
PTransform<PCollection<Entity>, ?> write =
DatastoreIO.v1().deleteEntity().withProjectId("myProject");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat(
"DatastoreIO write should include the project in its primitive display data",
displayData,
hasItem(hasDisplayItem("projectId")));
assertThat(
"DatastoreIO write should include the deleteEntityFn in its primitive display data",
displayData,
hasItem(hasDisplayItem("deleteEntityFn")));
}
@Test
public void testDeleteKeyPrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
PTransform<PCollection<Key>, ?> write = DatastoreIO.v1().deleteKey().withProjectId("myProject");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat(
"DatastoreIO write should include the project in its primitive display data",
displayData,
hasItem(hasDisplayItem("projectId")));
assertThat(
"DatastoreIO write should include the deleteKeyFn in its primitive display data",
displayData,
hasItem(hasDisplayItem("deleteKeyFn")));
}
/** Test building a Write using builder methods. */
@Test
public void testBuildWrite() throws Exception {
DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
assertEquals(PROJECT_ID, write.getProjectId());
}
/** Test the detection of complete and incomplete keys. */
@Test
public void testHasNameOrId() {
Key key;
// Complete with name, no ancestor
key = makeKey("bird", "finch").build();
assertTrue(isValidKey(key));
// Complete with id, no ancestor
key = makeKey("bird", 123).build();
assertTrue(isValidKey(key));
// Incomplete, no ancestor
key = makeKey("bird").build();
assertFalse(isValidKey(key));
// Complete with name and ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird", "horned").build();
assertTrue(isValidKey(key));
// Complete with id and ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird", 123).build();
assertTrue(isValidKey(key));
// Incomplete with ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird").build();
assertFalse(isValidKey(key));
key = makeKey().build();
assertFalse(isValidKey(key));
}
/** Test that entities with incomplete keys cannot be updated. */
@Test
public void testAddEntitiesWithIncompleteKeys() throws Exception {
Key key = makeKey("bird").build();
Entity entity = Entity.newBuilder().setKey(key).build();
UpsertFn upsertFn = new UpsertFn();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys");
upsertFn.apply(entity);
}
@Test
/** Test that entities with valid keys are transformed to upsert mutations. */
public void testAddEntities() throws Exception {
Key key = makeKey("bird", "finch").build();
Entity entity = Entity.newBuilder().setKey(key).build();
UpsertFn upsertFn = new UpsertFn();
Mutation exceptedMutation = makeUpsert(entity).build();
assertEquals(upsertFn.apply(entity), exceptedMutation);
}
/** Test that entities with incomplete keys cannot be deleted. */
@Test
public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
Key key = makeKey("bird").build();
Entity entity = Entity.newBuilder().setKey(key).build();
DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Entities to be deleted from the Cloud Datastore must have complete keys");
deleteEntityFn.apply(entity);
}
/** Test that entities with valid keys are transformed to delete mutations. */
@Test
public void testDeleteEntities() throws Exception {
Key key = makeKey("bird", "finch").build();
Entity entity = Entity.newBuilder().setKey(key).build();
DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
Mutation exceptedMutation = makeDelete(entity.getKey()).build();
assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
}
/** Test that incomplete keys cannot be deleted. */
@Test
public void testDeleteIncompleteKeys() throws Exception {
Key key = makeKey("bird").build();
DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete");
deleteKeyFn.apply(key);
}
/** Test that valid keys are transformed to delete mutations. */
@Test
public void testDeleteKeys() {
Key key = makeKey("bird", "finch").build();
DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
Mutation exceptedMutation = makeDelete(key).build();
assertEquals(deleteKeyFn.apply(key), exceptedMutation);
}
@Test
public void testDatastoreWriteFnDisplayData() {
DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null);
DisplayData displayData = DisplayData.from(datastoreWriter);
assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
}
/** Tests {@link DatastoreWriterFn} with entities less than one batch. */
@Test
public void testDatatoreWriterFnWithOneBatch() throws Exception {
datastoreWriterFnTest(100);
}
/** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
@Test
public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
}
/**
* Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
* write batch size.
*/
@Test
public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
}
// A helper method to test DatastoreWriterFn for various batch sizes.
private void datastoreWriterFnTest(int numMutations) throws Exception {
// Create the requested number of mutations.
List<Mutation> mutations = new ArrayList<>(numMutations);
for (int i = 0; i < numMutations; ++i) {
mutations.add(
makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
}
DatastoreWriterFn datastoreWriter =
new DatastoreWriterFn(
StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
doFnTester.processBundle(mutations);
int start = 0;
while (start < numMutations) {
int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START);
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
commitRequest.addAllMutations(mutations.subList(start, end));
// Verify all the batch requests were made with the expected mutations.
verify(mockDatastore, times(1)).commit(commitRequest.build());
start = end;
}
}
/**
* Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
*/
@Test
public void testDatatoreWriterFnWithLargeEntities() throws Exception {
List<Mutation> mutations = new ArrayList<>();
int entitySize = 0;
for (int i = 0; i < 12; ++i) {
Entity entity =
Entity.newBuilder()
.setKey(makeKey("key" + i, i + 1))
.putProperties(
"long",
makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build())
.build();
entitySize = entity.getSerializedSize(); // Take the size of any one entity.
mutations.add(makeUpsert(entity).build());
}
DatastoreWriterFn datastoreWriter =
new DatastoreWriterFn(
StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
doFnTester.processBundle(mutations);
// This test is over-specific currently; it requires that we split the 12 entity writes into 3
// requests, but we only need each CommitRequest to be less than 10MB in size.
int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
int start = 0;
while (start < mutations.size()) {
int end = Math.min(mutations.size(), start + entitiesPerRpc);
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
commitRequest.addAllMutations(mutations.subList(start, end));
// Verify all the batch requests were made with the expected mutations.
verify(mockDatastore).commit(commitRequest.build());
start = end;
}
}
/** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
@Test
public void testDatatoreWriterFnRetriesErrors() throws Exception {
List<Mutation> mutations = new ArrayList<>();
int numRpcs = 2;
for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
mutations.add(
makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
}
CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
when(mockDatastore.commit(any(CommitRequest.class)))
.thenReturn(successfulCommit)
.thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
.thenReturn(successfulCommit);
DatastoreWriterFn datastoreWriter =
new DatastoreWriterFn(
StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
doFnTester.processBundle(mutations);
}
/**
* Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
* query.
*/
@Test
public void testEstimatedSizeBytes() throws Exception {
long entityBytes = 100L;
// In seconds
long timestamp = 1234L;
RunQueryRequest latestTimestampRequest =
makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE);
RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
// Per Kind statistics request and response
RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
verify(mockDatastore, times(1)).runQuery(statRequest);
}
/** Tests {@link SplitQueryFn} when number of query splits is specified. */
@Test
public void testSplitQueryFnWithNumSplits() throws Exception {
int numSplits = 100;
when(mockQuerySplitter.getSplits(
eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
.thenReturn(splitQuery(QUERY, numSplits));
SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
/**
* Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
* mock factory using a when clause for unit testing purposes, it is not serializable because it
* doesn't have a no-arg constructor. Thus disabling the cloning to prevent the doFn from being
* serialized.
*/
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<Query> queries = doFnTester.processBundle(QUERY);
assertEquals(queries.size(), numSplits);
// Confirms that sub-queries are not equal to original when there is more than one split.
for (Query subQuery : queries) {
assertNotEquals(subQuery, QUERY);
}
verify(mockQuerySplitter, times(1))
.getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
verifyZeroInteractions(mockDatastore);
}
/** Tests {@link SplitQueryFn} when no query splits is specified. */
@Test
public void testSplitQueryFnWithoutNumSplits() throws Exception {
// Force SplitQueryFn to compute the number of query splits
int numSplits = 0;
int expectedNumSplits = 20;
long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
// In seconds
long timestamp = 1234L;
RunQueryRequest latestTimestampRequest =
makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE);
RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
// Per Kind statistics request and response
RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
when(mockQuerySplitter.getSplits(
eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
.thenReturn(splitQuery(QUERY, expectedNumSplits));
SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<Query> queries = doFnTester.processBundle(QUERY);
assertEquals(expectedNumSplits, queries.size());
verify(mockQuerySplitter, times(1))
.getSplits(eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
verify(mockDatastore, times(1)).runQuery(statRequest);
}
/** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */
@Test
public void testSplitQueryFnWithQueryLimit() throws Exception {
Query queryWithLimit = QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<Query> queries = doFnTester.processBundle(queryWithLimit);
assertEquals(1, queries.size());
verifyNoMoreInteractions(mockDatastore);
verifyNoMoreInteractions(mockQuerySplitter);
}
/** Tests {@link ReadFn} with a query limit less than one batch. */
@Test
public void testReadFnWithOneBatch() throws Exception {
readFnTest(5);
}
/** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
@Test
public void testReadFnWithMultipleBatches() throws Exception {
readFnTest(QUERY_BATCH_LIMIT + 5);
}
/** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
@Test
public void testReadFnWithBatchesExactMultiple() throws Exception {
readFnTest(5 * QUERY_BATCH_LIMIT);
}
/** Tests that {@link ReadFn} retries after an error. */
@Test
public void testReadFnRetriesErrors() throws Exception {
// An empty query to read entities.
Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
// Use mockResponseForQuery to generate results.
when(mockDatastore.runQuery(any(RunQueryRequest.class)))
.thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
.thenAnswer(
invocationOnMock -> {
Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
return mockResponseForQuery(q);
});
ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
doFnTester.processBundle(query);
}
@Test
public void testTranslateGqlQueryWithLimit() throws Exception {
String gql = "SELECT * from DummyKind LIMIT 10";
String gqlWithZeroLimit = gql + " LIMIT 0";
GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
GqlQuery gqlQueryWithZeroLimit =
GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace());
RunQueryRequest gqlRequestWithZeroLimit =
makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
.thenThrow(
new DatastoreException(
"runQuery",
Code.INVALID_ARGUMENT,
"invalid query",
// dummy
new RuntimeException()));
when(mockDatastore.runQuery(gqlRequest))
.thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
assertEquals(
translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY);
verify(mockDatastore, times(1)).runQuery(gqlRequest);
verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
}
@Test
public void testTranslateGqlQueryWithNoLimit() throws Exception {
String gql = "SELECT * from DummyKind";
String gqlWithZeroLimit = gql + " LIMIT 0";
GqlQuery gqlQueryWithZeroLimit =
GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
RunQueryRequest gqlRequestWithZeroLimit =
makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
.thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
assertEquals(
translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY);
verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
}
/** Test options. * */
public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getDatastoreProject();
void setDatastoreProject(ValueProvider<String> value);
ValueProvider<String> getGqlQuery();
void setGqlQuery(ValueProvider<String> value);
ValueProvider<String> getNamespace();
void setNamespace(ValueProvider<String> value);
}
/**
* Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time
* when built with {@link DatastoreV1.Read#withQuery(Query)}.
*/
@Test
public void testRuntimeOptionsNotCalledInApplyQuery() {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
Pipeline pipeline = TestPipeline.create(options);
pipeline
.apply(
DatastoreIO.v1()
.read()
.withProjectId(options.getDatastoreProject())
.withQuery(QUERY)
.withNamespace(options.getNamespace()))
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
}
/**
* Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time
* when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
*/
@Test
public void testRuntimeOptionsNotCalledInApplyGqlQuery() {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
Pipeline pipeline = TestPipeline.create(options);
pipeline
.apply(
DatastoreIO.v1()
.read()
.withProjectId(options.getDatastoreProject())
.withLiteralGqlQuery(options.getGqlQuery()))
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
}
@Test
public void testWriteBatcherWithoutData() {
DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
writeBatcher.start();
assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
}
@Test
public void testWriteBatcherFastQueries() {
DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
writeBatcher.start();
writeBatcher.addRequestLatency(0, 1000, 200);
writeBatcher.addRequestLatency(0, 1000, 200);
assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
}
@Test
public void testWriteBatcherSlowQueries() {
DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
writeBatcher.start();
writeBatcher.addRequestLatency(0, 10000, 200);
writeBatcher.addRequestLatency(0, 10000, 200);
assertEquals(100, writeBatcher.nextBatchSize(0));
}
@Test
public void testWriteBatcherSizeNotBelowMinimum() {
DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
writeBatcher.start();
writeBatcher.addRequestLatency(0, 30000, 50);
writeBatcher.addRequestLatency(0, 30000, 50);
assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
}
@Test
public void testWriteBatcherSlidingWindow() {
DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
writeBatcher.start();
writeBatcher.addRequestLatency(0, 30000, 50);
writeBatcher.addRequestLatency(50000, 5000, 200);
writeBatcher.addRequestLatency(100000, 5000, 200);
assertEquals(200, writeBatcher.nextBatchSize(150000));
}
/** Helper Methods */
/** A helper function that verifies if all the queries have unique keys. */
private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
Set<Integer> keys = new HashSet<>();
for (KV<Integer, Query> kv : queries) {
keys.add(kv.getKey());
}
assertEquals(keys.size(), queries.size());
}
/**
* A helper function that creates mock {@link Entity} results in response to a query. Always
* indicates that more results are available, unless the batch is limited to fewer than {@link
* DatastoreV1.Read#QUERY_BATCH_LIMIT} results.
*/
private static RunQueryResponse mockResponseForQuery(Query q) {
// Every query DatastoreV1 sends should have a limit.
assertTrue(q.hasLimit());
// The limit should be in the range [1, QUERY_BATCH_LIMIT]
int limit = q.getLimit().getValue();
assertThat(limit, greaterThanOrEqualTo(1));
assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
// Create the requested number of entities.
List<EntityResult> entities = new ArrayList<>(limit);
for (int i = 0; i < limit; ++i) {
entities.add(
EntityResult.newBuilder()
.setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
.build());
}
// Fill out the other parameters on the returned result batch.
RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
ret.getBatchBuilder()
.addAllEntityResults(entities)
.setEntityResultType(EntityResult.ResultType.FULL)
.setMoreResults(
limit == QUERY_BATCH_LIMIT
? QueryResultBatch.MoreResultsType.NOT_FINISHED
: QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
return ret.build();
}
/** Helper function to run a test reading from a {@link ReadFn}. */
private void readFnTest(int numEntities) throws Exception {
// An empty query to read entities.
Query query =
Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(numEntities)).build();
// Use mockResponseForQuery to generate results.
when(mockDatastore.runQuery(any(RunQueryRequest.class)))
.thenAnswer(
invocationOnMock -> {
Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
return mockResponseForQuery(q);
});
ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
/**
* Although Datastore client is marked transient in {@link ReadFn}, when injected through mock
* factory using a when clause for unit testing purposes, it is not serializable because it
* doesn't have a no-arg constructor. Thus disabling the cloning to prevent the test object from
* being serialized.
*/
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<Entity> entities = doFnTester.processBundle(query);
int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
// Validate the number of results.
assertEquals(numEntities, entities.size());
}
/** Builds a per-kind statistics response with the given entity size. */
private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
RunQueryResponse.Builder statKindResponse = RunQueryResponse.newBuilder();
Entity.Builder entity = Entity.newBuilder();
entity.setKey(makeKey("dummyKind", "dummyId"));
entity.putProperties("entity_bytes", makeValue(entitySizeInBytes).build());
EntityResult.Builder entityResult = EntityResult.newBuilder();
entityResult.setEntity(entity);
QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
batch.addEntityResults(entityResult);
statKindResponse.setBatch(batch);
return statKindResponse.build();
}
/** Builds a response of the given timestamp. */
private static RunQueryResponse makeLatestTimestampResponse(long timestamp) {
RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
Entity.Builder entity = Entity.newBuilder();
entity.setKey(makeKey("dummyKind", "dummyId"));
entity.putProperties("timestamp", makeValue(new Date(timestamp * 1000)).build());
EntityResult.Builder entityResult = EntityResult.newBuilder();
entityResult.setEntity(entity);
QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
batch.addEntityResults(entityResult);
timestampResponse.setBatch(batch);
return timestampResponse.build();
}
/** Builds a per-kind statistics query for the given timestamp and namespace. */
private static Query makeStatKindQuery(String namespace, long timestamp) {
Query.Builder statQuery = Query.newBuilder();
if (namespace == null) {
statQuery.addKindBuilder().setName("__Stat_Kind__");
} else {
statQuery.addKindBuilder().setName("__Stat_Ns_Kind__");
}
statQuery.setFilter(
makeAndFilter(
makeFilter("kind_name", EQUAL, makeValue(KIND).build()).build(),
makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L).build()).build()));
return statQuery.build();
}
/** Builds a latest timestamp statistics query. */
private static Query makeLatestTimestampQuery(String namespace) {
Query.Builder timestampQuery = Query.newBuilder();
if (namespace == null) {
timestampQuery.addKindBuilder().setName("__Stat_Total__");
} else {
timestampQuery.addKindBuilder().setName("__Stat_Ns_Total__");
}
timestampQuery.addOrder(makeOrder("timestamp", DESCENDING));
timestampQuery.setLimit(Int32Value.newBuilder().setValue(1));
return timestampQuery.build();
}
/** Generate dummy query splits. */
private List<Query> splitQuery(Query query, int numSplits) {
List<Query> queries = new ArrayList<>();
int offsetOfOriginal = query.getOffset();
for (int i = 0; i < numSplits; i++) {
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(KIND);
// Making sub-queries unique (and not equal to the original query) by setting different
// offsets.
q.setOffset(++offsetOfOriginal);
queries.add(q.build());
}
return queries;
}
/**
* A WriteBatcher for unit tests, which does no timing-based adjustments (so unit tests have
* consistent results).
*/
static class FakeWriteBatcher implements DatastoreV1.WriteBatcher {
@Override
public void start() {}
@Override
public void addRequestLatency(
long timeSinceEpochMillis, long latencyMillis, int numMutations) {}
@Override
public int nextBatchSize(long timeSinceEpochMillis) {
return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
}
}
}