blob: 4bcc4733c2bf34bb481a333c81728b1fef2a4114 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
import static com.google.datastore.v1.Value.ValueTypeCase.ARRAY_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.BLOB_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.BOOLEAN_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.DOUBLE_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.ENTITY_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.INTEGER_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.KEY_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.NULL_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.STRING_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.TIMESTAMP_VALUE;
import static com.google.datastore.v1.Value.ValueTypeCase.VALUETYPE_NOT_SET;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.Value.ValueTypeCase;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
import org.apache.beam.sdk.values.RowWithStorage;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable {
public static final String KEY_FIELD_PROPERTY = "keyField";
@VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__";
private static final Logger LOGGER = LoggerFactory.getLogger(DataStoreV1Table.class);
// Should match: `projectId/kind`.
private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
@VisibleForTesting final String keyField;
@VisibleForTesting final String projectId;
@VisibleForTesting final String kind;
public DataStoreV1Table(Table table) {
super(table.getSchema());
// TODO: allow users to specify a name of the field to store a key value via TableProperties.
JSONObject properties = table.getProperties();
if (properties.containsKey(KEY_FIELD_PROPERTY)) {
String field = properties.getString(KEY_FIELD_PROPERTY);
checkArgument(
field != null && !field.isEmpty(), "'%s' property cannot be null.", KEY_FIELD_PROPERTY);
keyField = field;
} else {
keyField = DEFAULT_KEY_FIELD;
}
// TODO: allow users to specify a namespace in a location string.
String location = table.getLocation();
assert location != null;
Matcher matcher = locationPattern.matcher(location);
checkArgument(
matcher.matches(),
"DataStoreV1 location must be in the following format: 'projectId/kind'");
this.projectId = matcher.group("projectId");
this.kind = matcher.group("kind");
}
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(kind);
Query query = q.build();
DatastoreV1.Read readInstance =
DatastoreIO.v1().read().withProjectId(projectId).withQuery(query);
PCollection<Entity> readEntities = readInstance.expand(begin);
return readEntities.apply(EntityToRow.create(getSchema(), keyField)).setRowSchema(schema);
}
@Override
public POutput buildIOWriter(PCollection<Row> input) {
return input
.apply(RowToEntity.create(getSchema(), keyField, kind))
.apply(DatastoreIO.v1().write().withProjectId(projectId));
}
@Override
public IsBounded isBounded() {
return IsBounded.BOUNDED;
}
@Override
public BeamTableStatistics getTableStatistics(PipelineOptions options) {
long count =
DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, null);
if (count < 0) {
return BeamTableStatistics.BOUNDED_UNKNOWN;
}
return BeamTableStatistics.createBoundedTableStatistics((double) count);
}
public static class EntityToRow extends PTransform<PCollection<Entity>, PCollection<Row>> {
private final Schema schema;
private final String keyField;
private EntityToRow(Schema schema, String keyField) {
this.schema = schema;
this.keyField = keyField;
if (schema.getFieldNames().contains(keyField)
&& !schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) {
throw new IllegalStateException(
"Field `"
+ keyField
+ "` should of type `VARBINARY`. Please change the type or specify a field to store the KEY value in via TableProperties.");
}
}
public static EntityToRow create(Schema schema) {
LOGGER.info(
"VARBINARY field to store KEY was not specified, using default value: `"
+ DEFAULT_KEY_FIELD
+ "`.");
return new EntityToRow(schema, DEFAULT_KEY_FIELD);
}
public static EntityToRow create(Schema schema, String keyField) {
LOGGER.info("VARBINARY field to store KEY was specified, using value: `" + keyField + "`.");
return new EntityToRow(schema, keyField);
}
@Override
public PCollection<Row> expand(PCollection<Entity> input) {
return input.apply(ParDo.of(new EntityToRowConverter()));
}
@VisibleForTesting
class EntityToRowConverter extends DoFn<Entity, Row> {
private final ImmutableMap<ValueTypeCase, Function<Value, Object>> MAPPING_FUNCTIONS =
ImmutableMap.<ValueTypeCase, Function<Value, Object>>builder()
.put(NULL_VALUE, (Function<Value, Object> & Serializable) v -> null)
.put(BOOLEAN_VALUE, (Function<Value, Object> & Serializable) Value::getBooleanValue)
.put(INTEGER_VALUE, (Function<Value, Object> & Serializable) Value::getIntegerValue)
.put(DOUBLE_VALUE, (Function<Value, Object> & Serializable) Value::getDoubleValue)
.put(
TIMESTAMP_VALUE,
(Function<Value, Object> & Serializable)
v -> {
// TODO: DataStore may not support milliseconds.
com.google.protobuf.Timestamp time = v.getTimestampValue();
long millis = time.getSeconds() * 1000 + time.getNanos() / 1000;
return Instant.ofEpochMilli(millis).toDateTime();
})
.put(STRING_VALUE, (Function<Value, Object> & Serializable) Value::getStringValue)
// https://cloud.google.com/datastore/docs/concepts/entities.
.put(
KEY_VALUE,
(Function<Value, Object> & Serializable) v -> v.getKeyValue().toByteArray())
.put(
BLOB_VALUE,
(Function<Value, Object> & Serializable) v -> v.getBlobValue().toByteArray())
.put(VALUETYPE_NOT_SET, (Function<Value, Object> & Serializable) v -> null)
.build();
private final Function<Value, Object> MAPPING_NOT_FOUND =
(Function<Value, Object> & Serializable)
v -> {
throw new IllegalStateException(
"No conversion exists from type: "
+ v.getValueTypeCase().name()
+ " to Beam type. Supported types are: "
+ Arrays.toString(MAPPING_FUNCTIONS.keySet().toArray()));
};
@DoFn.ProcessElement
public void processElement(ProcessContext context) {
Entity entity = context.element();
ImmutableMap.Builder<String, Value> mapBuilder = ImmutableMap.builder();
mapBuilder.put(keyField, makeValue(entity.getKey()).build());
mapBuilder.putAll(entity.getPropertiesMap());
context.output(extractRowFromProperties(schema, mapBuilder.build()));
}
private Object convertValueToObject(FieldType currentFieldType, Value val) {
ValueTypeCase typeCase = val.getValueTypeCase();
if (typeCase.equals(ENTITY_VALUE)) {
// Recursive mapping for row type.
Schema rowSchema = currentFieldType.getRowSchema();
assert rowSchema != null;
Entity entity = val.getEntityValue();
return extractRowFromProperties(rowSchema, entity.getPropertiesMap());
} else if (typeCase.equals(ARRAY_VALUE)) {
// Recursive mapping for collection type.
FieldType elementType = currentFieldType.getCollectionElementType();
List<Value> valueList = val.getArrayValue().getValuesList();
return valueList.stream()
.map(v -> convertValueToObject(elementType, v))
.collect(Collectors.toList());
}
// Mapping for primitive types.
return MAPPING_FUNCTIONS.getOrDefault(typeCase, MAPPING_NOT_FOUND).apply(val);
}
private Row extractRowFromProperties(Schema schema, Map<String, Value> values) {
Row.Builder builder = Row.withSchema(schema);
// It is not a guarantee that the values will be in the same order as the schema.
// Maybe metadata:
// https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries
// TODO: figure out in what order the elements are in (without relying on Beam schema).
for (Schema.Field field : schema.getFields()) {
Value val = values.get(field.getName());
builder.addValue(convertValueToObject(field.getType(), val));
}
return builder.build();
}
}
}
public static class RowToEntity extends PTransform<PCollection<Row>, PCollection<Entity>> {
private final Supplier<String> keySupplier;
private final Schema schema;
private final String kind;
private final String keyField;
private RowToEntity(Supplier<String> keySupplier, Schema schema, String kind, String keyField) {
this.keySupplier = keySupplier;
this.schema = schema;
this.kind = kind;
this.keyField = keyField;
if (schema.getFieldNames().contains(keyField)
&& !schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) {
throw new IllegalStateException(
"Field `"
+ keyField
+ "` should of type `VARBINARY`. Please change the type or specify a field to write the KEY value from via TableProperties.");
}
}
@Override
public PCollection<Entity> expand(PCollection<Row> input) {
return input.apply(ParDo.of(new RowToEntityConverter()));
}
public static RowToEntity create(Schema schema, String keyField, String kind) {
LOGGER.info(
"VARBINARY field with the KEY was not specified, using default value: `"
+ DEFAULT_KEY_FIELD
+ "`.");
return new RowToEntity(
(Supplier<String> & Serializable) () -> UUID.randomUUID().toString(),
schema,
kind,
keyField);
}
@VisibleForTesting
static RowToEntity createTest(String keyString, Schema schema, String kind) {
return new RowToEntity(
(Supplier<String> & Serializable) () -> keyString, schema, kind, DEFAULT_KEY_FIELD);
}
@VisibleForTesting
class RowToEntityConverter extends DoFn<Row, Entity> {
private final ImmutableMap<Class, Function<?, Value>> MAPPING_FUNCTIONS =
ImmutableMap.<Class, Function<?, Value>>builder()
.put(
Boolean.class,
(Function<Boolean, Value> & Serializable) v -> makeValue(v).build())
.put(Byte.class, (Function<Byte, Value> & Serializable) v -> makeValue(v).build())
.put(Long.class, (Function<Long, Value> & Serializable) v -> makeValue(v).build())
.put(Short.class, (Function<Short, Value> & Serializable) v -> makeValue(v).build())
.put(
Integer.class,
(Function<Integer, Value> & Serializable) v -> makeValue(v).build())
.put(Double.class, (Function<Double, Value> & Serializable) v -> makeValue(v).build())
.put(Float.class, (Function<Float, Value> & Serializable) v -> makeValue(v).build())
.put(String.class, (Function<String, Value> & Serializable) v -> makeValue(v).build())
.put(
Instant.class,
(Function<Instant, Value> & Serializable) v -> makeValue(v.toDate()).build())
.put(
byte[].class,
(Function<byte[], Value> & Serializable)
v -> makeValue(ByteString.copyFrom(v)).build())
.put(RowWithStorage.class, (Function<Row, Value> & Serializable) this::mapRowToValue)
.put(RowWithGetters.class, (Function<Row, Value> & Serializable) this::mapRowToValue)
.put(
ArrayList.class,
(Function<Collection<Object>, Value> & Serializable) this::mapCollectionToValue)
.build();
private final Function<Object, Value> MAPPING_NOT_FOUND =
(Function<Object, Value> & Serializable)
v -> {
throw new IllegalStateException(
"No conversion exists from type: "
+ v.getClass()
+ " to DataStove Value. Supported types are: "
+ Arrays.toString(MAPPING_FUNCTIONS.keySet().toArray()));
};
@DoFn.ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
Schema schemaWithoutKeyField =
Schema.builder()
.addFields(
schema.getFields().stream()
.filter(field -> !field.getName().equals(keyField))
.collect(Collectors.toList()))
.build();
Entity.Builder entityBuilder = constructEntityFromRow(schemaWithoutKeyField, row);
entityBuilder.setKey(constructKeyFromRow(row));
context.output(entityBuilder.build());
}
/**
* Converts an entire {@code Row} to an appropriate DataStore {@code Entity.Builder}.
*
* @param row {@code Row} to convert.
* @return resulting {@code Entity.Builder}.
*/
private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
Entity.Builder entityBuilder = Entity.newBuilder();
for (Schema.Field field : schema.getFields()) {
Value val = mapObjectToValue(row.getValue(field.getName()));
entityBuilder.putProperties(field.getName(), val);
}
return entityBuilder;
}
/**
* Create a random key for a {@code Row} without a keyField or use a user-specified key by
* parsing it from byte array when keyField is set.
*
* @param row {@code Row} to construct a key for.
* @return resulting {@code Key}.
*/
private Key constructKeyFromRow(Row row) {
if (!row.getSchema().getFieldNames().contains(keyField)) {
// When key field is not present - use key supplier to generate a random one.
return makeKey(kind, keySupplier.get()).build();
}
byte[] keyBytes = row.getBytes(keyField);
try {
return Key.parseFrom(keyBytes);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Failed to parse DataStore key from bytes.");
}
}
/**
* A mapping function to handle conversion of Collections (such as {@code ArrayList}) to
* DataStore {@code Value}.
*
* @param collection {@code Collection} to convert.
* @return resulting {@code Value}.
*/
private Value mapCollectionToValue(Collection<Object> collection) {
List<Value> arrayValues =
collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
return makeValue(arrayValues).build();
}
/**
* A mapping function to handle conversion of nested {@code Row} to DataStore {@code Value}.
*
* @param row {@code Row} to convert.
* @return resulting {@code Value}.
*/
private Value mapRowToValue(Row row) {
return makeValue(constructEntityFromRow(row.getSchema(), row)).build();
}
/**
* Converts a {@code Row} value to an appropriate DataStore {@code Value} object.
*
* @param value {@code Row} value to convert.
* @throws IllegalStateException when no mapping function for object of given type exists.
* @return resulting {@code Value}.
*/
private Value mapObjectToValue(Object value) {
if (value == null) {
return Value.newBuilder().build();
}
return ((Function<Object, Value>)
MAPPING_FUNCTIONS.getOrDefault(value.getClass(), MAPPING_NOT_FOUND))
.apply(value);
}
}
}
}