blob: ea00821f36045229000c03aa557b5d0794dc0052 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.datastore;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.NUM_QUERY_SPLITS_MIN;
import static org.junit.Assert.assertEquals;
import com.google.datastore.v1.Query;
import java.util.List;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Integration tests for {@link DatastoreV1.Read.SplitQueryFn}.
*
* <p>It is hard to mock the exact behavior of Cloud Datastore, especially for the statistics
* queries. Also the fact that DatastoreIO falls back gracefully when querying statistics fails,
* makes it hard to catch these issues in production. This test here ensures we interact with the
* Cloud Datastore directly, query the actual stats and verify that the SplitQueryFn generates the
* expected number of query splits.
*
* <p>These tests are brittle as they rely on statistics data in Cloud Datastore. If the data gets
* lost or changes then they will begin failing and this test should be disabled. At the time of
* writing, the Cloud Datastore has the following statistics,
*
* <ul>
* <li>kind = sort_1G, entity_bytes = 2130000000, count = 10000000
* <li>kind = shakespeare, entity_bytes = 26383451, count = 172948
* </ul>
*/
@RunWith(JUnit4.class)
public class SplitQueryFnIT {
private Instant readTime = Instant.now().minus(Duration.standardSeconds(10));
/** Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset. */
@Test
public void testSplitQueryFnWithLargeDataset() throws Exception {
String projectId = "apache-beam-testing";
String kind = "sort_1G";
String namespace = null;
// Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by
// Datastore stats.
int expectedNumSplits = 32;
testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null);
testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime);
}
/** Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset. */
@Test
public void testSplitQueryFnWithSmallDataset() throws Exception {
String projectId = "apache-beam-testing";
String kind = "shakespeare";
String namespace = null;
int expectedNumSplits = NUM_QUERY_SPLITS_MIN;
testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null);
testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime);
}
/** A helper method to test {@link SplitQueryFn} to generate the expected number of splits. */
private void testSplitQueryFn(
String projectId,
String kind,
@Nullable String namespace,
int expectedNumSplits,
@Nullable Instant readTime)
throws Exception {
Query.Builder query = Query.newBuilder();
query.addKindBuilder().setName(kind);
SplitQueryFn splitQueryFn =
new SplitQueryFn(V1Options.from(projectId, namespace, null), 0, readTime);
DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
List<Query> queries = doFnTester.processBundle(query.build());
assertEquals(expectedNumSplits, queries.size());
}
// TODO (vikasrk): Create datasets under a different namespace and add tests.
}