| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.meta.provider.datastore; |
| |
| import static com.google.datastore.v1.Value.ValueTypeCase.ARRAY_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.BLOB_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.BOOLEAN_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.DOUBLE_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.ENTITY_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.INTEGER_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.KEY_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.NULL_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.STRING_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.TIMESTAMP_VALUE; |
| import static com.google.datastore.v1.Value.ValueTypeCase.VALUETYPE_NOT_SET; |
| import static com.google.datastore.v1.client.DatastoreHelper.makeKey; |
| import static com.google.datastore.v1.client.DatastoreHelper.makeValue; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import com.alibaba.fastjson.JSONObject; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.datastore.v1.Entity; |
| import com.google.datastore.v1.Key; |
| import com.google.datastore.v1.Query; |
| import com.google.datastore.v1.Value; |
| import com.google.datastore.v1.Value.ValueTypeCase; |
| import com.google.protobuf.ByteString; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; |
| import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; |
| import org.apache.beam.sdk.extensions.sql.meta.Table; |
| import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; |
| import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.schemas.Schema; |
| import org.apache.beam.sdk.schemas.Schema.FieldType; |
| import org.apache.beam.sdk.schemas.Schema.TypeName; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollection.IsBounded; |
| import org.apache.beam.sdk.values.POutput; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.sdk.values.RowWithGetters; |
| import org.apache.beam.sdk.values.RowWithStorage; |
| import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable { |
| public static final String KEY_FIELD_PROPERTY = "keyField"; |
| @VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__"; |
| private static final Logger LOGGER = LoggerFactory.getLogger(DataStoreV1Table.class); |
| // Should match: `projectId/kind`. |
| private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)"); |
| @VisibleForTesting final String keyField; |
| @VisibleForTesting final String projectId; |
| @VisibleForTesting final String kind; |
| |
| public DataStoreV1Table(Table table) { |
| super(table.getSchema()); |
| |
| // TODO: allow users to specify a name of the field to store a key value via TableProperties. |
| JSONObject properties = table.getProperties(); |
| if (properties.containsKey(KEY_FIELD_PROPERTY)) { |
| String field = properties.getString(KEY_FIELD_PROPERTY); |
| checkArgument( |
| field != null && !field.isEmpty(), "'%s' property cannot be null.", KEY_FIELD_PROPERTY); |
| keyField = field; |
| } else { |
| keyField = DEFAULT_KEY_FIELD; |
| } |
| // TODO: allow users to specify a namespace in a location string. |
| String location = table.getLocation(); |
| assert location != null; |
| Matcher matcher = locationPattern.matcher(location); |
| checkArgument( |
| matcher.matches(), |
| "DataStoreV1 location must be in the following format: 'projectId/kind'"); |
| |
| this.projectId = matcher.group("projectId"); |
| this.kind = matcher.group("kind"); |
| } |
| |
| @Override |
| public PCollection<Row> buildIOReader(PBegin begin) { |
| Query.Builder q = Query.newBuilder(); |
| q.addKindBuilder().setName(kind); |
| Query query = q.build(); |
| |
| DatastoreV1.Read readInstance = |
| DatastoreIO.v1().read().withProjectId(projectId).withQuery(query); |
| |
| PCollection<Entity> readEntities = readInstance.expand(begin); |
| |
| return readEntities.apply(EntityToRow.create(getSchema(), keyField)).setRowSchema(schema); |
| } |
| |
| @Override |
| public POutput buildIOWriter(PCollection<Row> input) { |
| return input |
| .apply(RowToEntity.create(getSchema(), keyField, kind)) |
| .apply(DatastoreIO.v1().write().withProjectId(projectId)); |
| } |
| |
| @Override |
| public IsBounded isBounded() { |
| return IsBounded.BOUNDED; |
| } |
| |
| @Override |
| public BeamTableStatistics getTableStatistics(PipelineOptions options) { |
| long count = |
| DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, null); |
| |
| if (count < 0) { |
| return BeamTableStatistics.BOUNDED_UNKNOWN; |
| } |
| |
| return BeamTableStatistics.createBoundedTableStatistics((double) count); |
| } |
| |
| public static class EntityToRow extends PTransform<PCollection<Entity>, PCollection<Row>> { |
| private final Schema schema; |
| private final String keyField; |
| |
| private EntityToRow(Schema schema, String keyField) { |
| this.schema = schema; |
| this.keyField = keyField; |
| |
| if (schema.getFieldNames().contains(keyField) |
| && !schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) { |
| throw new IllegalStateException( |
| "Field `" |
| + keyField |
| + "` should of type `VARBINARY`. Please change the type or specify a field to store the KEY value in via TableProperties."); |
| } |
| } |
| |
| public static EntityToRow create(Schema schema) { |
| LOGGER.info( |
| "VARBINARY field to store KEY was not specified, using default value: `" |
| + DEFAULT_KEY_FIELD |
| + "`."); |
| return new EntityToRow(schema, DEFAULT_KEY_FIELD); |
| } |
| |
| public static EntityToRow create(Schema schema, String keyField) { |
| LOGGER.info("VARBINARY field to store KEY was specified, using value: `" + keyField + "`."); |
| return new EntityToRow(schema, keyField); |
| } |
| |
| @Override |
| public PCollection<Row> expand(PCollection<Entity> input) { |
| return input.apply(ParDo.of(new EntityToRowConverter())); |
| } |
| |
| @VisibleForTesting |
| class EntityToRowConverter extends DoFn<Entity, Row> { |
| private final ImmutableMap<ValueTypeCase, Function<Value, Object>> MAPPING_FUNCTIONS = |
| ImmutableMap.<ValueTypeCase, Function<Value, Object>>builder() |
| .put(NULL_VALUE, (Function<Value, Object> & Serializable) v -> null) |
| .put(BOOLEAN_VALUE, (Function<Value, Object> & Serializable) Value::getBooleanValue) |
| .put(INTEGER_VALUE, (Function<Value, Object> & Serializable) Value::getIntegerValue) |
| .put(DOUBLE_VALUE, (Function<Value, Object> & Serializable) Value::getDoubleValue) |
| .put( |
| TIMESTAMP_VALUE, |
| (Function<Value, Object> & Serializable) |
| v -> { |
| // TODO: DataStore may not support milliseconds. |
| com.google.protobuf.Timestamp time = v.getTimestampValue(); |
| long millis = time.getSeconds() * 1000 + time.getNanos() / 1000; |
| return Instant.ofEpochMilli(millis).toDateTime(); |
| }) |
| .put(STRING_VALUE, (Function<Value, Object> & Serializable) Value::getStringValue) |
| // https://cloud.google.com/datastore/docs/concepts/entities. |
| .put( |
| KEY_VALUE, |
| (Function<Value, Object> & Serializable) v -> v.getKeyValue().toByteArray()) |
| .put( |
| BLOB_VALUE, |
| (Function<Value, Object> & Serializable) v -> v.getBlobValue().toByteArray()) |
| .put(VALUETYPE_NOT_SET, (Function<Value, Object> & Serializable) v -> null) |
| .build(); |
| private final Function<Value, Object> MAPPING_NOT_FOUND = |
| (Function<Value, Object> & Serializable) |
| v -> { |
| throw new IllegalStateException( |
| "No conversion exists from type: " |
| + v.getValueTypeCase().name() |
| + " to Beam type. Supported types are: " |
| + Arrays.toString(MAPPING_FUNCTIONS.keySet().toArray())); |
| }; |
| |
| @DoFn.ProcessElement |
| public void processElement(ProcessContext context) { |
| Entity entity = context.element(); |
| ImmutableMap.Builder<String, Value> mapBuilder = ImmutableMap.builder(); |
| mapBuilder.put(keyField, makeValue(entity.getKey()).build()); |
| mapBuilder.putAll(entity.getPropertiesMap()); |
| |
| context.output(extractRowFromProperties(schema, mapBuilder.build())); |
| } |
| |
| private Object convertValueToObject(FieldType currentFieldType, Value val) { |
| ValueTypeCase typeCase = val.getValueTypeCase(); |
| if (typeCase.equals(ENTITY_VALUE)) { |
| // Recursive mapping for row type. |
| Schema rowSchema = currentFieldType.getRowSchema(); |
| assert rowSchema != null; |
| Entity entity = val.getEntityValue(); |
| return extractRowFromProperties(rowSchema, entity.getPropertiesMap()); |
| } else if (typeCase.equals(ARRAY_VALUE)) { |
| // Recursive mapping for collection type. |
| FieldType elementType = currentFieldType.getCollectionElementType(); |
| List<Value> valueList = val.getArrayValue().getValuesList(); |
| return valueList.stream() |
| .map(v -> convertValueToObject(elementType, v)) |
| .collect(Collectors.toList()); |
| } |
| |
| // Mapping for primitive types. |
| return MAPPING_FUNCTIONS.getOrDefault(typeCase, MAPPING_NOT_FOUND).apply(val); |
| } |
| |
| private Row extractRowFromProperties(Schema schema, Map<String, Value> values) { |
| Row.Builder builder = Row.withSchema(schema); |
| // It is not a guarantee that the values will be in the same order as the schema. |
| // Maybe metadata: |
| // https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries |
| // TODO: figure out in what order the elements are in (without relying on Beam schema). |
| for (Schema.Field field : schema.getFields()) { |
| Value val = values.get(field.getName()); |
| builder.addValue(convertValueToObject(field.getType(), val)); |
| } |
| return builder.build(); |
| } |
| } |
| } |
| |
| public static class RowToEntity extends PTransform<PCollection<Row>, PCollection<Entity>> { |
| private final Supplier<String> keySupplier; |
| private final Schema schema; |
| private final String kind; |
| private final String keyField; |
| |
| private RowToEntity(Supplier<String> keySupplier, Schema schema, String kind, String keyField) { |
| this.keySupplier = keySupplier; |
| this.schema = schema; |
| this.kind = kind; |
| this.keyField = keyField; |
| |
| if (schema.getFieldNames().contains(keyField) |
| && !schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) { |
| throw new IllegalStateException( |
| "Field `" |
| + keyField |
| + "` should of type `VARBINARY`. Please change the type or specify a field to write the KEY value from via TableProperties."); |
| } |
| } |
| |
| @Override |
| public PCollection<Entity> expand(PCollection<Row> input) { |
| return input.apply(ParDo.of(new RowToEntityConverter())); |
| } |
| |
| public static RowToEntity create(Schema schema, String keyField, String kind) { |
| LOGGER.info( |
| "VARBINARY field with the KEY was not specified, using default value: `" |
| + DEFAULT_KEY_FIELD |
| + "`."); |
| return new RowToEntity( |
| (Supplier<String> & Serializable) () -> UUID.randomUUID().toString(), |
| schema, |
| kind, |
| keyField); |
| } |
| |
| @VisibleForTesting |
| static RowToEntity createTest(String keyString, Schema schema, String kind) { |
| return new RowToEntity( |
| (Supplier<String> & Serializable) () -> keyString, schema, kind, DEFAULT_KEY_FIELD); |
| } |
| |
| @VisibleForTesting |
| class RowToEntityConverter extends DoFn<Row, Entity> { |
| private final ImmutableMap<Class, Function<?, Value>> MAPPING_FUNCTIONS = |
| ImmutableMap.<Class, Function<?, Value>>builder() |
| .put( |
| Boolean.class, |
| (Function<Boolean, Value> & Serializable) v -> makeValue(v).build()) |
| .put(Byte.class, (Function<Byte, Value> & Serializable) v -> makeValue(v).build()) |
| .put(Long.class, (Function<Long, Value> & Serializable) v -> makeValue(v).build()) |
| .put(Short.class, (Function<Short, Value> & Serializable) v -> makeValue(v).build()) |
| .put( |
| Integer.class, |
| (Function<Integer, Value> & Serializable) v -> makeValue(v).build()) |
| .put(Double.class, (Function<Double, Value> & Serializable) v -> makeValue(v).build()) |
| .put(Float.class, (Function<Float, Value> & Serializable) v -> makeValue(v).build()) |
| .put(String.class, (Function<String, Value> & Serializable) v -> makeValue(v).build()) |
| .put( |
| Instant.class, |
| (Function<Instant, Value> & Serializable) v -> makeValue(v.toDate()).build()) |
| .put( |
| byte[].class, |
| (Function<byte[], Value> & Serializable) |
| v -> makeValue(ByteString.copyFrom(v)).build()) |
| .put(RowWithStorage.class, (Function<Row, Value> & Serializable) this::mapRowToValue) |
| .put(RowWithGetters.class, (Function<Row, Value> & Serializable) this::mapRowToValue) |
| .put( |
| ArrayList.class, |
| (Function<Collection<Object>, Value> & Serializable) this::mapCollectionToValue) |
| .build(); |
| private final Function<Object, Value> MAPPING_NOT_FOUND = |
| (Function<Object, Value> & Serializable) |
| v -> { |
| throw new IllegalStateException( |
| "No conversion exists from type: " |
| + v.getClass() |
| + " to DataStove Value. Supported types are: " |
| + Arrays.toString(MAPPING_FUNCTIONS.keySet().toArray())); |
| }; |
| |
| @DoFn.ProcessElement |
| public void processElement(ProcessContext context) { |
| Row row = context.element(); |
| |
| Schema schemaWithoutKeyField = |
| Schema.builder() |
| .addFields( |
| schema.getFields().stream() |
| .filter(field -> !field.getName().equals(keyField)) |
| .collect(Collectors.toList())) |
| .build(); |
| Entity.Builder entityBuilder = constructEntityFromRow(schemaWithoutKeyField, row); |
| entityBuilder.setKey(constructKeyFromRow(row)); |
| |
| context.output(entityBuilder.build()); |
| } |
| |
| /** |
| * Converts an entire {@code Row} to an appropriate DataStore {@code Entity.Builder}. |
| * |
| * @param row {@code Row} to convert. |
| * @return resulting {@code Entity.Builder}. |
| */ |
| private Entity.Builder constructEntityFromRow(Schema schema, Row row) { |
| Entity.Builder entityBuilder = Entity.newBuilder(); |
| for (Schema.Field field : schema.getFields()) { |
| Value val = mapObjectToValue(row.getValue(field.getName())); |
| entityBuilder.putProperties(field.getName(), val); |
| } |
| return entityBuilder; |
| } |
| |
| /** |
| * Create a random key for a {@code Row} without a keyField or use a user-specified key by |
| * parsing it from byte array when keyField is set. |
| * |
| * @param row {@code Row} to construct a key for. |
| * @return resulting {@code Key}. |
| */ |
| private Key constructKeyFromRow(Row row) { |
| if (!row.getSchema().getFieldNames().contains(keyField)) { |
| // When key field is not present - use key supplier to generate a random one. |
| return makeKey(kind, keySupplier.get()).build(); |
| } |
| byte[] keyBytes = row.getBytes(keyField); |
| try { |
| return Key.parseFrom(keyBytes); |
| } catch (InvalidProtocolBufferException e) { |
| throw new IllegalStateException("Failed to parse DataStore key from bytes."); |
| } |
| } |
| |
| /** |
| * A mapping function to handle conversion of Collections (such as {@code ArrayList}) to |
| * DataStore {@code Value}. |
| * |
| * @param collection {@code Collection} to convert. |
| * @return resulting {@code Value}. |
| */ |
| private Value mapCollectionToValue(Collection<Object> collection) { |
| List<Value> arrayValues = |
| collection.stream().map(this::mapObjectToValue).collect(Collectors.toList()); |
| return makeValue(arrayValues).build(); |
| } |
| |
| /** |
| * A mapping function to handle conversion of nested {@code Row} to DataStore {@code Value}. |
| * |
| * @param row {@code Row} to convert. |
| * @return resulting {@code Value}. |
| */ |
| private Value mapRowToValue(Row row) { |
| return makeValue(constructEntityFromRow(row.getSchema(), row)).build(); |
| } |
| |
| /** |
| * Converts a {@code Row} value to an appropriate DataStore {@code Value} object. |
| * |
| * @param value {@code Row} value to convert. |
| * @throws IllegalStateException when no mapping function for object of given type exists. |
| * @return resulting {@code Value}. |
| */ |
| private Value mapObjectToValue(Object value) { |
| if (value == null) { |
| return Value.newBuilder().build(); |
| } |
| return ((Function<Object, Value>) |
| MAPPING_FUNCTIONS.getOrDefault(value.getClass(), MAPPING_NOT_FOUND)) |
| .apply(value); |
| } |
| } |
| } |
| } |