Add IT that does not rely on SQL
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
index c933773..3274846 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
 
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
 import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
 import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
 import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
@@ -28,15 +30,24 @@
 import static org.hamcrest.Matchers.equalTo;
 
 import com.google.datastore.v1.Key;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.PropertyFilter.Operator;
+import com.google.datastore.v1.Query;
 import java.util.UUID;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
+import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
@@ -132,6 +143,36 @@
   }
 
   @Test
+  public void testWriteRead_viaCoreBeamIO() {
+    String projectId = options.getProject();
+    Key ancestor = makeKey(KIND, UUID.randomUUID().toString()).build();
+    Key itemKey =
+        makeKey(ancestor, KIND, UUID.randomUUID().toString())
+            .setPartitionId(PartitionId.newBuilder().setProjectId(projectId).build())
+            .build();
+    Row testWriteRow =
+        Row.withSchema(SOURCE_SCHEMA).addValues(itemKey.toByteArray(), "4000").build();
+
+    writePipeline
+        .apply(Create.of(testWriteRow).withRowSchema(SOURCE_SCHEMA))
+        .apply(RowToEntity.create("__key__", KIND))
+        .apply(DatastoreIO.v1().write().withProjectId(projectId));
+    writePipeline.run().waitUntilFinish();
+
+    Query.Builder query = Query.newBuilder();
+    query.addKindBuilder().setName(KIND);
+    query.setFilter(makeFilter("__key__", Operator.EQUAL, makeValue(itemKey)));
+
+    DatastoreV1.Read read =
+        DatastoreIO.v1().read().withProjectId(projectId).withQuery(query.build());
+    PCollection<Row> rowsRead =
+        readPipeline.apply(read).apply(EntityToRow.create(SOURCE_SCHEMA, "__key__"));
+
+    PAssert.that(rowsRead).containsInAnyOrder(testWriteRow);
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
   public void testReadAllSupportedTypes() {
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new DataStoreV1TableProvider());
     String projectId = options.getProject();