blob: 90af770420ddd40e34ccf16b2368192da1f9bf8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
/** Utility methods for translating schemas. */
public class SchemaTranslation {
private static final BiMap<TypeName, RunnerApi.Schema.TypeName> TYPE_NAME_MAPPING =
ImmutableBiMap.<TypeName, RunnerApi.Schema.TypeName>builder()
.put(TypeName.BYTE, RunnerApi.Schema.TypeName.BYTE)
.put(TypeName.INT16, RunnerApi.Schema.TypeName.INT16)
.put(TypeName.INT32, RunnerApi.Schema.TypeName.INT32)
.put(TypeName.INT64, RunnerApi.Schema.TypeName.INT64)
.put(TypeName.DECIMAL, RunnerApi.Schema.TypeName.DECIMAL)
.put(TypeName.FLOAT, RunnerApi.Schema.TypeName.FLOAT)
.put(TypeName.DOUBLE, RunnerApi.Schema.TypeName.DOUBLE)
.put(TypeName.STRING, RunnerApi.Schema.TypeName.STRING)
.put(TypeName.DATETIME, RunnerApi.Schema.TypeName.DATETIME)
.put(TypeName.BOOLEAN, RunnerApi.Schema.TypeName.BOOLEAN)
.put(TypeName.BYTES, RunnerApi.Schema.TypeName.BYTES)
.put(TypeName.ARRAY, RunnerApi.Schema.TypeName.ARRAY)
.put(TypeName.MAP, RunnerApi.Schema.TypeName.MAP)
.put(TypeName.ROW, RunnerApi.Schema.TypeName.ROW)
.put(TypeName.LOGICAL_TYPE, RunnerApi.Schema.TypeName.LOGICAL_TYPE)
.build();
public static RunnerApi.Schema toProto(Schema schema) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
RunnerApi.Schema.Builder builder = RunnerApi.Schema.newBuilder().setId(uuid);
for (Field field : schema.getFields()) {
RunnerApi.Schema.Field protoField =
toProto(
field,
schema.indexOf(field.getName()),
schema.getEncodingPositions().get(field.getName()));
builder.addFields(protoField);
}
return builder.build();
}
private static RunnerApi.Schema.Field toProto(Field field, int fieldId, int position) {
return RunnerApi.Schema.Field.newBuilder()
.setName(field.getName())
.setDescription(field.getDescription())
.setType(toProto(field.getType()))
.setId(fieldId)
.setEncodingPosition(position)
.build();
}
private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) {
RunnerApi.Schema.FieldType.Builder builder =
RunnerApi.Schema.FieldType.newBuilder()
.setTypeName(TYPE_NAME_MAPPING.get(fieldType.getTypeName()));
switch (fieldType.getTypeName()) {
case ROW:
builder.setRowSchema(toProto(fieldType.getRowSchema()));
break;
case ARRAY:
builder.setCollectionElementType(toProto(fieldType.getCollectionElementType()));
break;
case MAP:
builder.setMapType(
RunnerApi.Schema.MapType.newBuilder()
.setKeyType(toProto(fieldType.getMapKeyType()))
.setValueType(toProto(fieldType.getMapValueType()))
.build());
break;
case LOGICAL_TYPE:
LogicalType logicalType = fieldType.getLogicalType();
builder.setLogicalType(
RunnerApi.Schema.LogicalType.newBuilder()
.setId(logicalType.getIdentifier())
.setArgs(logicalType.getArgument())
.setBaseType(toProto(logicalType.getBaseType()))
.setSerializedClass(
ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType)))
.build());
break;
default:
break;
}
builder.setNullable(fieldType.getNullable());
return builder.build();
}
public static Schema fromProto(RunnerApi.Schema protoSchema) {
Schema.Builder builder = Schema.builder();
Map<String, Integer> encodingLocationMap = Maps.newHashMap();
for (RunnerApi.Schema.Field protoField : protoSchema.getFieldsList()) {
Field field = fieldFromProto(protoField);
builder.addField(field);
encodingLocationMap.put(protoField.getName(), protoField.getEncodingPosition());
}
Schema schema = builder.build();
schema.setEncodingPositions(encodingLocationMap);
if (!protoSchema.getId().isEmpty()) {
schema.setUUID(UUID.fromString(protoSchema.getId()));
}
return schema;
}
private static Field fieldFromProto(RunnerApi.Schema.Field protoField) {
return Field.of(protoField.getName(), fieldTypeFromProto(protoField.getType()))
.withDescription(protoField.getDescription());
}
private static FieldType fieldTypeFromProto(RunnerApi.Schema.FieldType protoFieldType) {
TypeName typeName = TYPE_NAME_MAPPING.inverse().get(protoFieldType.getTypeName());
FieldType fieldType;
switch (typeName) {
case ROW:
fieldType = FieldType.row(fromProto(protoFieldType.getRowSchema()));
break;
case ARRAY:
fieldType = FieldType.array(fieldTypeFromProto(protoFieldType.getCollectionElementType()));
break;
case MAP:
fieldType =
FieldType.map(
fieldTypeFromProto(protoFieldType.getMapType().getKeyType()),
fieldTypeFromProto(protoFieldType.getMapType().getValueType()));
break;
case LOGICAL_TYPE:
LogicalType logicalType =
(LogicalType)
SerializableUtils.deserializeFromByteArray(
protoFieldType.getLogicalType().getSerializedClass().toByteArray(),
"logicalType");
fieldType = FieldType.logicalType(logicalType);
break;
default:
fieldType = FieldType.of(typeName);
}
if (protoFieldType.getNullable()) {
fieldType = fieldType.withNullable(true);
}
return fieldType;
}
}