| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.bigquery; |
| |
| import static java.util.stream.Collectors.toList; |
| import static java.util.stream.Collectors.toMap; |
| import static org.apache.beam.sdk.values.Row.toRow; |
| |
| import com.google.api.services.bigquery.model.TableFieldSchema; |
| import com.google.api.services.bigquery.model.TableRow; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.auto.value.AutoValue; |
| import java.io.Serializable; |
| import java.math.BigDecimal; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.function.Function; |
| import java.util.stream.IntStream; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.beam.sdk.schemas.LogicalTypes; |
| import org.apache.beam.sdk.schemas.Schema; |
| import org.apache.beam.sdk.schemas.Schema.Field; |
| import org.apache.beam.sdk.schemas.Schema.FieldType; |
| import org.apache.beam.sdk.schemas.Schema.TypeName; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.SerializableFunctions; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding; |
| import org.joda.time.DateTime; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Instant; |
| import org.joda.time.ReadableInstant; |
| import org.joda.time.chrono.ISOChronology; |
| import org.joda.time.format.DateTimeFormatter; |
| import org.joda.time.format.DateTimeFormatterBuilder; |
| |
| /** Utility methods for BigQuery related operations. */ |
| public class BigQueryUtils { |
| |
| /** Options for how to convert BigQuery data to Beam data. */ |
| @AutoValue |
| public abstract static class ConversionOptions implements Serializable { |
| |
| /** |
| * Controls whether to truncate timestamps to millisecond precision lossily, or to crash when |
| * truncation would result. |
| */ |
| public enum TruncateTimestamps { |
| /** Reject timestamps with greater-than-millisecond precision. */ |
| REJECT, |
| |
| /** Truncate timestamps to millisecond precision. */ |
| TRUNCATE; |
| } |
| |
| public abstract TruncateTimestamps getTruncateTimestamps(); |
| |
| public static Builder builder() { |
| return new AutoValue_BigQueryUtils_ConversionOptions.Builder() |
| .setTruncateTimestamps(TruncateTimestamps.REJECT); |
| } |
| |
| /** Builder for {@link ConversionOptions}. */ |
| @AutoValue.Builder |
| public abstract static class Builder { |
| public abstract Builder setTruncateTimestamps(TruncateTimestamps truncateTimestamps); |
| |
| public abstract ConversionOptions build(); |
| } |
| } |
| |
| private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER; |
| |
| /** |
| * Native BigQuery formatter for it's timestamp format, depending on the milliseconds stored in |
| * the column, the milli second part will be 6, 3 or absent. Example {@code 2019-08-16 |
| * 00:52:07[.123]|[.123456] UTC} |
| */ |
| private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PARSER; |
| |
| static { |
| DateTimeFormatter dateTimePart = |
| new DateTimeFormatterBuilder() |
| .appendYear(4, 4) |
| .appendLiteral('-') |
| .appendMonthOfYear(2) |
| .appendLiteral('-') |
| .appendDayOfMonth(2) |
| .appendLiteral(' ') |
| .appendHourOfDay(2) |
| .appendLiteral(':') |
| .appendMinuteOfHour(2) |
| .appendLiteral(':') |
| .appendSecondOfMinute(2) |
| .toFormatter() |
| .withZoneUTC(); |
| BIGQUERY_TIMESTAMP_PARSER = |
| new DateTimeFormatterBuilder() |
| .append(dateTimePart) |
| .appendOptional( |
| new DateTimeFormatterBuilder() |
| .appendLiteral('.') |
| .appendFractionOfSecond(3, 6) |
| .toParser()) |
| .appendLiteral(" UTC") |
| .toFormatter() |
| .withZoneUTC(); |
| BIGQUERY_TIMESTAMP_PRINTER = |
| new DateTimeFormatterBuilder() |
| .append(dateTimePart) |
| .appendLiteral('.') |
| .appendFractionOfSecond(3, 3) |
| .appendLiteral(" UTC") |
| .toFormatter(); |
| } |
| |
| private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING = |
| ImmutableMap.<TypeName, StandardSQLTypeName>builder() |
| .put(TypeName.BYTE, StandardSQLTypeName.INT64) |
| .put(TypeName.INT16, StandardSQLTypeName.INT64) |
| .put(TypeName.INT32, StandardSQLTypeName.INT64) |
| .put(TypeName.INT64, StandardSQLTypeName.INT64) |
| .put(TypeName.FLOAT, StandardSQLTypeName.FLOAT64) |
| .put(TypeName.DOUBLE, StandardSQLTypeName.FLOAT64) |
| .put(TypeName.DECIMAL, StandardSQLTypeName.NUMERIC) |
| .put(TypeName.BOOLEAN, StandardSQLTypeName.BOOL) |
| .put(TypeName.ARRAY, StandardSQLTypeName.ARRAY) |
| .put(TypeName.ROW, StandardSQLTypeName.STRUCT) |
| .put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP) |
| .put(TypeName.STRING, StandardSQLTypeName.STRING) |
| .put(TypeName.BYTES, StandardSQLTypeName.BYTES) |
| .build(); |
| |
| private static final Map<TypeName, Function<String, Object>> JSON_VALUE_PARSERS = |
| ImmutableMap.<TypeName, Function<String, Object>>builder() |
| .put(TypeName.BYTE, Byte::valueOf) |
| .put(TypeName.INT16, Short::valueOf) |
| .put(TypeName.INT32, Integer::valueOf) |
| .put(TypeName.INT64, Long::valueOf) |
| .put(TypeName.FLOAT, Float::valueOf) |
| .put(TypeName.DOUBLE, Double::valueOf) |
| .put(TypeName.DECIMAL, BigDecimal::new) |
| .put(TypeName.BOOLEAN, Boolean::valueOf) |
| .put(TypeName.STRING, str -> str) |
| .put( |
| TypeName.DATETIME, |
| str -> { |
| if (str == null || str.length() == 0) { |
| return null; |
| } |
| if (str.endsWith("UTC")) { |
| return BIGQUERY_TIMESTAMP_PARSER.parseDateTime(str).toDateTime(DateTimeZone.UTC); |
| } else { |
| return new DateTime( |
| (long) (Double.parseDouble(str) * 1000), ISOChronology.getInstanceUTC()); |
| } |
| }) |
| .put(TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) |
| .build(); |
| |
| // TODO: BigQuery code should not be relying on Calcite metadata fields. If so, this belongs |
| // in the SQL package. |
| private static final Map<String, StandardSQLTypeName> BEAM_TO_BIGQUERY_LOGICAL_MAPPING = |
| ImmutableMap.<String, StandardSQLTypeName>builder() |
| .put("SqlDateType", StandardSQLTypeName.DATE) |
| .put("SqlTimeType", StandardSQLTypeName.TIME) |
| .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME) |
| .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.DATETIME) |
| .put("SqlCharType", StandardSQLTypeName.STRING) |
| .build(); |
| |
| /** |
| * Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Beam {@link |
| * FieldType}. |
| */ |
| private static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { |
| if (fieldType.getTypeName().isLogicalType()) { |
| StandardSQLTypeName foundType = |
| BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier()); |
| if (foundType != null) { |
| return foundType; |
| } |
| } |
| return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName()); |
| } |
| |
| /** |
| * Get the Beam {@link FieldType} from a BigQuery type name. |
| * |
| * <p>Supports both standard and legacy SQL types. |
| * |
| * @param typeName Name of the type |
| * @param nestedFields Nested fields for the given type (eg. RECORD type) |
| * @return Corresponding Beam {@link FieldType} |
| */ |
| private static FieldType fromTableFieldSchemaType( |
| String typeName, List<TableFieldSchema> nestedFields) { |
| switch (typeName) { |
| case "STRING": |
| return FieldType.STRING; |
| case "BYTES": |
| return FieldType.BYTES; |
| case "INT64": |
| case "INTEGER": |
| return FieldType.INT64; |
| case "FLOAT64": |
| case "FLOAT": |
| return FieldType.DOUBLE; |
| case "BOOL": |
| case "BOOLEAN": |
| return FieldType.BOOLEAN; |
| case "TIMESTAMP": |
| return FieldType.DATETIME; |
| case "TIME": |
| return FieldType.logicalType( |
| new LogicalTypes.PassThroughLogicalType<Instant>( |
| "SqlTimeType", "", FieldType.DATETIME) {}); |
| case "DATE": |
| return FieldType.logicalType( |
| new LogicalTypes.PassThroughLogicalType<Instant>( |
| "SqlDateType", "", FieldType.DATETIME) {}); |
| case "DATETIME": |
| return FieldType.logicalType( |
| new LogicalTypes.PassThroughLogicalType<Instant>( |
| "SqlTimestampWithLocalTzType", "", FieldType.DATETIME) {}); |
| case "STRUCT": |
| case "RECORD": |
| Schema rowSchema = fromTableFieldSchema(nestedFields); |
| return FieldType.row(rowSchema); |
| default: |
| throw new UnsupportedOperationException( |
| "Converting BigQuery type " + typeName + " to Beam type is unsupported"); |
| } |
| } |
| |
| private static Schema fromTableFieldSchema(List<TableFieldSchema> tableFieldSchemas) { |
| Schema.Builder schemaBuilder = Schema.builder(); |
| for (TableFieldSchema tableFieldSchema : tableFieldSchemas) { |
| FieldType fieldType = |
| fromTableFieldSchemaType(tableFieldSchema.getType(), tableFieldSchema.getFields()); |
| |
| Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf); |
| if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) { |
| fieldType = FieldType.array(fieldType); |
| } |
| |
| // if the mode is not defined or if it is set to NULLABLE, then the field is nullable |
| boolean nullable = |
| !fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent(); |
| Field field = Field.of(tableFieldSchema.getName(), fieldType).withNullable(nullable); |
| if (tableFieldSchema.getDescription() != null |
| && !"".equals(tableFieldSchema.getDescription())) { |
| field = field.withDescription(tableFieldSchema.getDescription()); |
| } |
| schemaBuilder.addField(field); |
| } |
| return schemaBuilder.build(); |
| } |
| |
| private static List<TableFieldSchema> toTableFieldSchema(Schema schema) { |
| List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount()); |
| for (Field schemaField : schema.getFields()) { |
| FieldType type = schemaField.getType(); |
| |
| TableFieldSchema field = new TableFieldSchema().setName(schemaField.getName()); |
| if (schemaField.getDescription() != null && !"".equals(schemaField.getDescription())) { |
| field.setDescription(schemaField.getDescription()); |
| } |
| |
| if (!schemaField.getType().getNullable()) { |
| field.setMode(Mode.REQUIRED.toString()); |
| } |
| if (TypeName.ARRAY == type.getTypeName()) { |
| type = type.getCollectionElementType(); |
| if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) { |
| throw new IllegalArgumentException("Array of collection is not supported in BigQuery."); |
| } |
| field.setMode(Mode.REPEATED.toString()); |
| } |
| if (TypeName.ROW == type.getTypeName()) { |
| Schema subType = type.getRowSchema(); |
| field.setFields(toTableFieldSchema(subType)); |
| } |
| if (TypeName.MAP == type.getTypeName()) { |
| throw new IllegalArgumentException("Maps are not supported in BigQuery."); |
| } |
| field.setType(toStandardSQLTypeName(type).toString()); |
| |
| fields.add(field); |
| } |
| return fields; |
| } |
| |
| /** Convert a Beam {@link Schema} to a BigQuery {@link TableSchema}. */ |
| public static TableSchema toTableSchema(Schema schema) { |
| return new TableSchema().setFields(toTableFieldSchema(schema)); |
| } |
| |
| /** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */ |
| public static Schema fromTableSchema(TableSchema tableSchema) { |
| return fromTableFieldSchema(tableSchema.getFields()); |
| } |
| |
| /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ |
| public static org.apache.avro.Schema toGenericAvroSchema( |
| String schemaName, List<TableFieldSchema> fieldSchemas) { |
| return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas); |
| } |
| |
| private static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> |
| TABLE_ROW_TO_BEAM_ROW_FUNCTION = beamSchema -> (TableRow tr) -> toBeamRow(beamSchema, tr); |
| |
| public static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> tableRowToBeamRow() { |
| return TABLE_ROW_TO_BEAM_ROW_FUNCTION; |
| } |
| |
| private static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> |
| TABLE_ROW_FROM_BEAM_ROW_FUNCTION = ignored -> BigQueryUtils::toTableRow; |
| |
| public static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> tableRowFromBeamRow() { |
| return TABLE_ROW_FROM_BEAM_ROW_FUNCTION; |
| } |
| |
| private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW = |
| new ToTableRow(SerializableFunctions.identity()); |
| |
| /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */ |
| public static SerializableFunction<Row, TableRow> toTableRow() { |
| return ROW_TO_TABLE_ROW; |
| } |
| |
| /** Convert a Beam schema type to a BigQuery {@link TableRow}. */ |
| public static <T> SerializableFunction<T, TableRow> toTableRow( |
| SerializableFunction<T, Row> toRow) { |
| return new ToTableRow<>(toRow); |
| } |
| |
| /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */ |
| private static class ToTableRow<T> implements SerializableFunction<T, TableRow> { |
| private final SerializableFunction<T, Row> toRow; |
| |
| ToTableRow(SerializableFunction<T, Row> toRow) { |
| this.toRow = toRow; |
| } |
| |
| @Override |
| public TableRow apply(T input) { |
| return toTableRow(toRow.apply(input)); |
| } |
| } |
| |
| public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptions options) { |
| List<Object> valuesInOrder = |
| schema.getFields().stream() |
| .map( |
| field -> { |
| try { |
| return convertAvroFormat(field.getType(), record.get(field.getName()), options); |
| } catch (Exception cause) { |
| throw new IllegalArgumentException( |
| "Error converting field " + field + ": " + cause.getMessage(), cause); |
| } |
| }) |
| .collect(toList()); |
| |
| return Row.withSchema(schema).addValues(valuesInOrder).build(); |
| } |
| |
| /** Convert a BigQuery TableRow to a Beam Row. */ |
| public static TableRow toTableRow(Row row) { |
| TableRow output = new TableRow(); |
| for (int i = 0; i < row.getFieldCount(); i++) { |
| Object value = row.getValue(i); |
| Field schemaField = row.getSchema().getField(i); |
| output = output.set(schemaField.getName(), fromBeamField(schemaField.getType(), value)); |
| } |
| return output; |
| } |
| |
| private static Object fromBeamField(FieldType fieldType, Object fieldValue) { |
| if (fieldValue == null) { |
| if (!fieldType.getNullable()) { |
| throw new IllegalArgumentException("Field is not nullable."); |
| } |
| return null; |
| } |
| |
| switch (fieldType.getTypeName()) { |
| case ARRAY: |
| FieldType elementType = fieldType.getCollectionElementType(); |
| List items = (List) fieldValue; |
| List convertedItems = Lists.newArrayListWithCapacity(items.size()); |
| for (Object item : items) { |
| convertedItems.add(fromBeamField(elementType, item)); |
| } |
| return convertedItems; |
| |
| case ROW: |
| return toTableRow((Row) fieldValue); |
| |
| case DATETIME: |
| return ((Instant) fieldValue) |
| .toDateTime(DateTimeZone.UTC) |
| .toString(BIGQUERY_TIMESTAMP_PRINTER); |
| |
| case INT16: |
| case INT32: |
| case INT64: |
| case FLOAT: |
| case DOUBLE: |
| case STRING: |
| case BOOLEAN: |
| return fieldValue.toString(); |
| |
| case DECIMAL: |
| return fieldValue.toString(); |
| |
| case BYTES: |
| return BaseEncoding.base64().encode((byte[]) fieldValue); |
| |
| default: |
| return fieldValue; |
| } |
| } |
| |
| /** |
| * Tries to convert a JSON {@link TableRow} from BigQuery into a Beam {@link Row}. |
| * |
| * <p>Only supports basic types and arrays. Doesn't support date types or structs. |
| */ |
| public static Row toBeamRow(Schema rowSchema, TableRow jsonBqRow) { |
| // TODO deprecate toBeamRow(Schema, TableSchema, TableRow) function in favour of this function. |
| // This function attempts to convert TableRows without having access to the |
| // corresponding TableSchema because: |
| // 1. TableSchema contains redundant information already available in the Schema object. |
| // 2. TableSchema objects are not serializable and are therefore harder to propagate through a |
| // pipeline. |
| return rowSchema.getFields().stream() |
| .map(field -> toBeamRowFieldValue(field, jsonBqRow.get(field.getName()))) |
| .collect(toRow(rowSchema)); |
| } |
| |
| private static Object toBeamRowFieldValue(Field field, Object bqValue) { |
| if (bqValue == null) { |
| if (field.getType().getNullable()) { |
| return null; |
| } else { |
| throw new IllegalArgumentException( |
| "Received null value for non-nullable field " + field.getName()); |
| } |
| } |
| |
| return toBeamValue(field.getType(), bqValue); |
| } |
| |
| /** |
| * Tries to parse the JSON {@link TableRow} from BigQuery. |
| * |
| * <p>Only supports basic types and arrays. Doesn't support date types. |
| */ |
| public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jsonBqRow) { |
| List<TableFieldSchema> bqFields = bqSchema.getFields(); |
| |
| Map<String, Integer> bqFieldIndices = |
| IntStream.range(0, bqFields.size()) |
| .boxed() |
| .collect(toMap(i -> bqFields.get(i).getName(), i -> i)); |
| |
| List<Object> rawJsonValues = |
| rowSchema.getFields().stream() |
| .map(field -> bqFieldIndices.get(field.getName())) |
| .map(index -> jsonBqRow.getF().get(index).getV()) |
| .collect(toList()); |
| |
| return IntStream.range(0, rowSchema.getFieldCount()) |
| .boxed() |
| .map(index -> toBeamValue(rowSchema.getField(index).getType(), rawJsonValues.get(index))) |
| .collect(toRow(rowSchema)); |
| } |
| |
| private static Object toBeamValue(FieldType fieldType, Object jsonBQValue) { |
| if (jsonBQValue instanceof String && JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) { |
| return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply((String) jsonBQValue); |
| } |
| |
| if (jsonBQValue instanceof List) { |
| return ((List<Object>) jsonBQValue) |
| .stream() |
| .map(v -> ((Map<String, Object>) v).get("v")) |
| .map(v -> toBeamValue(fieldType.getCollectionElementType(), v)) |
| .collect(toList()); |
| } |
| |
| if (jsonBQValue instanceof Map) { |
| TableRow tr = new TableRow(); |
| tr.putAll((Map<String, Object>) jsonBQValue); |
| return toBeamRow(fieldType.getRowSchema(), tr); |
| } |
| |
| throw new UnsupportedOperationException( |
| "Converting BigQuery type '" |
| + jsonBQValue.getClass() |
| + "' to '" |
| + fieldType |
| + "' is not supported"); |
| } |
| |
| // TODO: BigQuery shouldn't know about SQL internal logical types. |
| private static final Set<String> SQL_DATE_TIME_TYPES = |
| ImmutableSet.of( |
| "SqlDateType", "SqlTimeType", "SqlTimeWithLocalTzType", "SqlTimestampWithLocalTzType"); |
| private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType"); |
| |
| /** |
| * Tries to convert an Avro decoded value to a Beam field value based on the target type of the |
| * Beam field. |
| */ |
| public static Object convertAvroFormat( |
| FieldType beamFieldType, Object avroValue, BigQueryUtils.ConversionOptions options) { |
| TypeName beamFieldTypeName = beamFieldType.getTypeName(); |
| if (avroValue == null) { |
| if (beamFieldType.getNullable()) { |
| return null; |
| } else { |
| throw new IllegalArgumentException(String.format("Field %s not nullable", beamFieldType)); |
| } |
| } |
| switch (beamFieldTypeName) { |
| case INT16: |
| case INT32: |
| case INT64: |
| case FLOAT: |
| case DOUBLE: |
| case BYTE: |
| case BOOLEAN: |
| return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue); |
| case DATETIME: |
| // Expecting value in microseconds. |
| switch (options.getTruncateTimestamps()) { |
| case TRUNCATE: |
| return truncateToMillis(avroValue); |
| case REJECT: |
| return safeToMillis(avroValue); |
| default: |
| throw new IllegalArgumentException( |
| String.format( |
| "Unknown timestamp truncation option: %s", options.getTruncateTimestamps())); |
| } |
| case STRING: |
| return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue); |
| case ARRAY: |
| return convertAvroArray(beamFieldType, avroValue, options); |
| case LOGICAL_TYPE: |
| String identifier = beamFieldType.getLogicalType().getIdentifier(); |
| if (SQL_DATE_TIME_TYPES.contains(identifier)) { |
| switch (options.getTruncateTimestamps()) { |
| case TRUNCATE: |
| return truncateToMillis(avroValue); |
| case REJECT: |
| return safeToMillis(avroValue); |
| default: |
| throw new IllegalArgumentException( |
| String.format( |
| "Unknown timestamp truncation option: %s", options.getTruncateTimestamps())); |
| } |
| } else if (SQL_STRING_TYPES.contains(identifier)) { |
| return convertAvroPrimitiveTypes(TypeName.STRING, avroValue); |
| } else { |
| throw new RuntimeException("Unknown logical type " + identifier); |
| } |
| case ROW: |
| Schema rowSchema = beamFieldType.getRowSchema(); |
| if (rowSchema == null) { |
| throw new IllegalArgumentException("Nested ROW missing row schema"); |
| } |
| GenericData.Record record = (GenericData.Record) avroValue; |
| return toBeamRow(record, rowSchema, options); |
| case DECIMAL: |
| throw new RuntimeException("Does not support converting DECIMAL type value"); |
| case MAP: |
| throw new RuntimeException("Does not support converting MAP type value"); |
| default: |
| throw new RuntimeException( |
| "Does not support converting unknown type value: " + beamFieldTypeName); |
| } |
| } |
| |
| private static ReadableInstant safeToMillis(Object value) { |
| long subMilliPrecision = ((long) value) % 1000; |
| if (subMilliPrecision != 0) { |
| throw new IllegalArgumentException( |
| String.format( |
| "BigQuery data contained value %s with sub-millisecond precision, which Beam does" |
| + " not currently support." |
| + " You can enable truncating timestamps to millisecond precision" |
| + " by using BigQueryIO.withTruncatedTimestamps", |
| value)); |
| } else { |
| return truncateToMillis(value); |
| } |
| } |
| |
| private static ReadableInstant truncateToMillis(Object value) { |
| return new Instant((long) value / 1000); |
| } |
| |
| private static Object convertAvroArray( |
| FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) { |
| // Check whether the type of array element is equal. |
| List<Object> values = (List<Object>) value; |
| List<Object> ret = new ArrayList(); |
| FieldType collectionElement = beamField.getCollectionElementType(); |
| for (Object v : values) { |
| ret.add(convertAvroFormat(collectionElement, v, options)); |
| } |
| return (Object) ret; |
| } |
| |
| private static Object convertAvroString(Object value) { |
| if (value == null) { |
| return null; |
| } else if (value instanceof org.apache.avro.util.Utf8) { |
| return ((org.apache.avro.util.Utf8) value).toString(); |
| } else if (value instanceof String) { |
| return value; |
| } else { |
| throw new RuntimeException( |
| "Does not support converting avro format: " + value.getClass().getName()); |
| } |
| } |
| |
| private static Object convertAvroPrimitiveTypes(TypeName beamType, Object value) { |
| switch (beamType) { |
| case BYTE: |
| return ((Long) value).byteValue(); |
| case INT16: |
| return ((Long) value).shortValue(); |
| case INT32: |
| return ((Long) value).intValue(); |
| case INT64: |
| return value; |
| case FLOAT: |
| return ((Double) value).floatValue(); |
| case DOUBLE: |
| return (Double) value; |
| case BOOLEAN: |
| return (Boolean) value; |
| case DECIMAL: |
| throw new RuntimeException("Does not support converting DECIMAL type value"); |
| case STRING: |
| return convertAvroString(value); |
| default: |
| throw new RuntimeException(beamType + " is not primitive type."); |
| } |
| } |
| } |