Add IT that does not rely on SQL
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
index c933773..3274846 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
@@ -28,15 +30,24 @@
import static org.hamcrest.Matchers.equalTo;
import com.google.datastore.v1.Key;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.PropertyFilter.Operator;
+import com.google.datastore.v1.Query;
import java.util.UUID;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
@@ -132,6 +143,36 @@
}
@Test
+ public void testWriteRead_viaCoreBeamIO() {
+ String projectId = options.getProject();
+ Key ancestor = makeKey(KIND, UUID.randomUUID().toString()).build();
+ Key itemKey =
+ makeKey(ancestor, KIND, UUID.randomUUID().toString())
+ .setPartitionId(PartitionId.newBuilder().setProjectId(projectId).build())
+ .build();
+ Row testWriteRow =
+ Row.withSchema(SOURCE_SCHEMA).addValues(itemKey.toByteArray(), "4000").build();
+
+ writePipeline
+ .apply(Create.of(testWriteRow).withRowSchema(SOURCE_SCHEMA))
+ .apply(RowToEntity.create("__key__", KIND))
+ .apply(DatastoreIO.v1().write().withProjectId(projectId));
+ writePipeline.run().waitUntilFinish();
+
+ Query.Builder query = Query.newBuilder();
+ query.addKindBuilder().setName(KIND);
+ query.setFilter(makeFilter("__key__", Operator.EQUAL, makeValue(itemKey)));
+
+ DatastoreV1.Read read =
+ DatastoreIO.v1().read().withProjectId(projectId).withQuery(query.build());
+ PCollection<Row> rowsRead =
+ readPipeline.apply(read).apply(EntityToRow.create(SOURCE_SCHEMA, "__key__"));
+
+ PAssert.that(rowsRead).containsInAnyOrder(testWriteRow);
+ readPipeline.run().waitUntilFinish();
+ }
+
+ @Test
public void testReadAllSupportedTypes() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
String projectId = options.getProject();