blob: 55b53b3f0c915d643740f9f008c5afa48b3b5c8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.datastore;
import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.getDatastore;
import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeAncestorKey;
import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeEntity;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.client.Datastore;
import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.UpsertMutationBuilder;
import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.V1TestWriter;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** End-to-end tests for Datastore DatastoreV1.Read. */
@RunWith(JUnit4.class)
public class V1ReadIT {
private V1TestOptions options;
private String project;
private String ancestor;
private final long numEntities = 1000;
@Before
public void setup() throws Exception {
PipelineOptionsFactory.register(V1TestOptions.class);
options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
ancestor = UUID.randomUUID().toString();
// Create entities and write them to datastore
writeEntitiesToDatastore(options, project, ancestor, numEntities);
}
@After
public void tearDown() throws Exception {
deleteAllEntities(options, project, ancestor);
}
/**
* An end-to-end test for {@link DatastoreV1.Read#withQuery(Query)}
*
* <p>Write some test entities to datastore and then run a pipeline that reads and counts the
* total number of entities. Verify that the count matches the number of entities written.
*/
@Test
public void testE2EV1Read() throws Exception {
// Read from datastore
Query query =
V1TestUtil.makeAncestorKindQuery(options.getKind(), options.getNamespace(), ancestor);
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
.withProjectId(project)
.withQuery(query)
.withNamespace(options.getNamespace());
// Count the total number of entities
Pipeline p = Pipeline.create(options);
PCollection<Long> count = p.apply(read).apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numEntities);
p.run();
}
@Test
public void testE2EV1ReadWithGQLQueryWithNoLimit() throws Exception {
testE2EV1ReadWithGQLQuery(0);
}
@Test
public void testE2EV1ReadWithGQLQueryWithLimit() throws Exception {
testE2EV1ReadWithGQLQuery(99);
}
/**
* An end-to-end test for {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
*
* <p>Write some test entities to datastore and then run a pipeline that reads and counts the
* total number of entities. Verify that the count matches the number of entities written.
*/
private void testE2EV1ReadWithGQLQuery(long limit) throws Exception {
String gqlQuery =
String.format(
"SELECT * from %s WHERE __key__ HAS ANCESTOR KEY(%s, '%s')",
options.getKind(), options.getKind(), ancestor);
long expectedNumEntities = numEntities;
if (limit > 0) {
gqlQuery = String.format("%s LIMIT %d", gqlQuery, limit);
expectedNumEntities = limit;
}
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
.withProjectId(project)
.withLiteralGqlQuery(gqlQuery)
.withNamespace(options.getNamespace());
// Count the total number of entities
Pipeline p = Pipeline.create(options);
PCollection<Long> count = p.apply(read).apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(expectedNumEntities);
p.run();
}
// Creates entities and write them to datastore
private static void writeEntitiesToDatastore(
V1TestOptions options, String project, String ancestor, long numEntities) throws Exception {
Datastore datastore = getDatastore(options, project);
// Write test entities to datastore
V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
for (long i = 0; i < numEntities; i++) {
Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0);
writer.write(entity);
}
writer.close();
}
}