blob: 32748468d7e51d9704e2984c6a67500aa51db944 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE;
import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.PropertyFilter.Operator;
import com.google.datastore.v1.Query;
import java.util.UUID;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class DataStoreReadWriteIT {
private static final BigQueryOptions options =
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
private static final Schema SOURCE_SCHEMA =
Schema.builder()
.addNullableField("__key__", VARBINARY)
.addNullableField("content", STRING)
.build();
private static final Schema SOURCE_SCHEMA_WITHOUT_KEY =
Schema.builder().addNullableField("content", STRING).build();
private static final String KIND = "writereadtest";
private static final String KIND_ALL_TYPES = "writereadalltypestest";
@Rule public final TestPipeline writePipeline = TestPipeline.create();
@Rule public transient TestPipeline readPipeline = TestPipeline.create();
@Test
public void testDataStoreV1SqlWriteRead() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
String projectId = options.getProject();
String createTableStatement =
"CREATE EXTERNAL TABLE TEST( \n"
+ " `__key__` VARBINARY, \n"
+ " `content` VARCHAR \n"
+ ") \n"
+ "TYPE 'datastoreV1' \n"
+ "LOCATION '"
+ projectId
+ "/"
+ KIND
+ "'";
sqlEnv.executeDdl(createTableStatement);
Key ancestor = makeKey(KIND, UUID.randomUUID().toString()).build();
Key itemKey = makeKey(ancestor, KIND, UUID.randomUUID().toString()).build();
String insertStatement =
"INSERT INTO TEST VALUES ( \n" + keyToSqlByteString(itemKey) + ", \n" + "'2000' \n" + ")";
BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement));
writePipeline.run().waitUntilFinish();
String selectTableStatement = "SELECT * FROM TEST";
PCollection<Row> output =
BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
assertThat(output.getSchema(), equalTo(SOURCE_SCHEMA));
PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
assertThat(state, equalTo(State.DONE));
}
@Test
public void testDataStoreV1SqlWriteRead_withoutKey() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
String projectId = options.getProject();
String createTableStatement =
"CREATE EXTERNAL TABLE TEST( \n"
+ " `content` VARCHAR \n"
+ ") \n"
+ "TYPE 'datastoreV1' \n"
+ "LOCATION '"
+ projectId
+ "/"
+ KIND
+ "'";
sqlEnv.executeDdl(createTableStatement);
String insertStatement = "INSERT INTO TEST VALUES ( '3000' )";
BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement));
writePipeline.run().waitUntilFinish();
String selectTableStatement = "SELECT * FROM TEST";
PCollection<Row> output =
BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
assertThat(output.getSchema(), equalTo(SOURCE_SCHEMA_WITHOUT_KEY));
PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
assertThat(state, equalTo(State.DONE));
}
@Test
public void testWriteRead_viaCoreBeamIO() {
String projectId = options.getProject();
Key ancestor = makeKey(KIND, UUID.randomUUID().toString()).build();
Key itemKey =
makeKey(ancestor, KIND, UUID.randomUUID().toString())
.setPartitionId(PartitionId.newBuilder().setProjectId(projectId).build())
.build();
Row testWriteRow =
Row.withSchema(SOURCE_SCHEMA).addValues(itemKey.toByteArray(), "4000").build();
writePipeline
.apply(Create.of(testWriteRow).withRowSchema(SOURCE_SCHEMA))
.apply(RowToEntity.create("__key__", KIND))
.apply(DatastoreIO.v1().write().withProjectId(projectId));
writePipeline.run().waitUntilFinish();
Query.Builder query = Query.newBuilder();
query.addKindBuilder().setName(KIND);
query.setFilter(makeFilter("__key__", Operator.EQUAL, makeValue(itemKey)));
DatastoreV1.Read read =
DatastoreIO.v1().read().withProjectId(projectId).withQuery(query.build());
PCollection<Row> rowsRead =
readPipeline.apply(read).apply(EntityToRow.create(SOURCE_SCHEMA, "__key__"));
PAssert.that(rowsRead).containsInAnyOrder(testWriteRow);
readPipeline.run().waitUntilFinish();
}
@Test
public void testReadAllSupportedTypes() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
String projectId = options.getProject();
final Schema expectedSchema =
Schema.builder()
.addNullableField("__key__", VARBINARY)
.addNullableField("boolean", BOOLEAN)
.addNullableField("datetime", DATETIME)
// TODO: flattening of nested fields by Calcite causes some issues.
/*.addRowField("embeddedentity",
Schema.builder()
.addNullableField("property1", STRING)
.addNullableField("property2", INT64)
.build())*/
.addNullableField("floatingnumber", DOUBLE)
.addNullableField("integer", INT64)
.addNullableField("primitivearray", FieldType.array(STRING))
.addNullableField("string", STRING)
.addNullableField("text", STRING)
.build();
String createTableStatement =
"CREATE EXTERNAL TABLE TEST( \n"
+ " `__key__` VARBINARY, \n"
+ " `boolean` BOOLEAN, \n"
+ " `datetime` TIMESTAMP, \n"
// + " `embeddedentity` ROW(`property1` VARCHAR, `property2` BIGINT), \n"
+ " `floatingnumber` DOUBLE, \n"
+ " `integer` BIGINT, \n"
+ " `primitivearray` ARRAY<VARCHAR>, \n"
+ " `string` VARCHAR, \n"
+ " `text` VARCHAR"
+ ") \n"
+ "TYPE 'datastoreV1' \n"
+ "LOCATION '"
+ projectId
+ "/"
+ KIND_ALL_TYPES
+ "'";
sqlEnv.executeDdl(createTableStatement);
String selectTableStatement = "SELECT * FROM TEST";
PCollection<Row> output =
BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
assertThat(output.getSchema(), equalTo(expectedSchema));
PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5));
assertThat(state, equalTo(State.DONE));
}
private static String keyToSqlByteString(Key key) {
return "X'" + ByteString.toString(key.toByteArray(), 16) + "'";
}
}