blob: 6025b22a42fd11f933fd27cbfb6abe68f6a033a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.values.Row.toRow;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
/** Utility methods for BigQuery related operations. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class BigQueryUtils {
// For parsing the format returned on the API proto:
// google.cloud.bigquery.storage.v1.ReadSession.getTable()
// "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
private static final Pattern TABLE_RESOURCE_PATTERN =
Pattern.compile(
"^projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)$");
// For parsing the format used to refer to tables parameters in BigQueryIO.
// "{project_id}:{dataset_id}.{table_id}" or
// "{project_id}.{dataset_id}.{table_id}"
private static final Pattern SIMPLE_TABLE_PATTERN =
Pattern.compile("^(?<PROJECT>[^\\.:]+)[\\.:](?<DATASET>[^\\.:]+)[\\.](?<TABLE>[^\\.:]+)$");
/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
/**
* Controls whether to truncate timestamps to millisecond precision lossily, or to crash when
* truncation would result.
*/
public enum TruncateTimestamps {
/** Reject timestamps with greater-than-millisecond precision. */
REJECT,
/** Truncate timestamps to millisecond precision. */
TRUNCATE;
}
public abstract TruncateTimestamps getTruncateTimestamps();
public static Builder builder() {
return new AutoValue_BigQueryUtils_ConversionOptions.Builder()
.setTruncateTimestamps(TruncateTimestamps.REJECT);
}
/** Builder for {@link ConversionOptions}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTruncateTimestamps(TruncateTimestamps truncateTimestamps);
public abstract ConversionOptions build();
}
}
/** Options for how to convert BigQuery schemas to Beam schemas. */
@AutoValue
public abstract static class SchemaConversionOptions implements Serializable {
/**
* /** Controls whether to use the map or row FieldType for a TableSchema field that appears to
* represent a map (it is an array of structs containing only {@code key} and {@code value}
* fields).
*/
public abstract boolean getInferMaps();
public static Builder builder() {
return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder().setInferMaps(false);
}
/** Builder for {@link SchemaConversionOptions}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setInferMaps(boolean inferMaps);
public abstract SchemaConversionOptions build();
}
}
private static final String BIGQUERY_TIME_PATTERN = "HH:mm:ss[.SSSSSS]";
private static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER =
java.time.format.DateTimeFormatter.ofPattern(BIGQUERY_TIME_PATTERN);
private static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER =
java.time.format.DateTimeFormatter.ofPattern("uuuu-MM-dd'T'" + BIGQUERY_TIME_PATTERN);
private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;
/**
* Native BigQuery formatter for it's timestamp format, depending on the milliseconds stored in
* the column, the milli second part will be 6, 3 or absent. Example {@code 2019-08-16
* 00:52:07[.123]|[.123456] UTC}
*/
private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PARSER;
static {
DateTimeFormatter dateTimePart =
new DateTimeFormatterBuilder()
.appendYear(4, 4)
.appendLiteral('-')
.appendMonthOfYear(2)
.appendLiteral('-')
.appendDayOfMonth(2)
.appendLiteral(' ')
.appendHourOfDay(2)
.appendLiteral(':')
.appendMinuteOfHour(2)
.appendLiteral(':')
.appendSecondOfMinute(2)
.toFormatter()
.withZoneUTC();
BIGQUERY_TIMESTAMP_PARSER =
new DateTimeFormatterBuilder()
.append(dateTimePart)
.appendOptional(
new DateTimeFormatterBuilder()
.appendLiteral('.')
.appendFractionOfSecond(3, 6)
.toParser())
.appendLiteral(" UTC")
.toFormatter()
.withZoneUTC();
BIGQUERY_TIMESTAMP_PRINTER =
new DateTimeFormatterBuilder()
.append(dateTimePart)
.appendLiteral('.')
.appendFractionOfSecond(3, 3)
.appendLiteral(" UTC")
.toFormatter();
}
private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING =
ImmutableMap.<TypeName, StandardSQLTypeName>builder()
.put(TypeName.BYTE, StandardSQLTypeName.INT64)
.put(TypeName.INT16, StandardSQLTypeName.INT64)
.put(TypeName.INT32, StandardSQLTypeName.INT64)
.put(TypeName.INT64, StandardSQLTypeName.INT64)
.put(TypeName.FLOAT, StandardSQLTypeName.FLOAT64)
.put(TypeName.DOUBLE, StandardSQLTypeName.FLOAT64)
.put(TypeName.DECIMAL, StandardSQLTypeName.NUMERIC)
.put(TypeName.BOOLEAN, StandardSQLTypeName.BOOL)
.put(TypeName.ARRAY, StandardSQLTypeName.ARRAY)
.put(TypeName.ITERABLE, StandardSQLTypeName.ARRAY)
.put(TypeName.ROW, StandardSQLTypeName.STRUCT)
.put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)
.put(TypeName.STRING, StandardSQLTypeName.STRING)
.put(TypeName.BYTES, StandardSQLTypeName.BYTES)
.build();
private static final Map<TypeName, Function<String, Object>> JSON_VALUE_PARSERS =
ImmutableMap.<TypeName, Function<String, Object>>builder()
.put(TypeName.BYTE, Byte::valueOf)
.put(TypeName.INT16, Short::valueOf)
.put(TypeName.INT32, Integer::valueOf)
.put(TypeName.INT64, Long::valueOf)
.put(TypeName.FLOAT, Float::valueOf)
.put(TypeName.DOUBLE, Double::valueOf)
.put(TypeName.DECIMAL, BigDecimal::new)
.put(TypeName.BOOLEAN, Boolean::valueOf)
.put(TypeName.STRING, str -> str)
.put(
TypeName.DATETIME,
str -> {
if (str == null || str.length() == 0) {
return null;
}
if (str.endsWith("UTC")) {
return BIGQUERY_TIMESTAMP_PARSER.parseDateTime(str).toDateTime(DateTimeZone.UTC);
} else {
return new DateTime(
(long) (Double.parseDouble(str) * 1000), ISOChronology.getInstanceUTC());
}
})
.put(TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
.build();
// TODO: BigQuery code should not be relying on Calcite metadata fields. If so, this belongs
// in the SQL package.
static final Map<String, StandardSQLTypeName> BEAM_TO_BIGQUERY_LOGICAL_MAPPING =
ImmutableMap.<String, StandardSQLTypeName>builder()
.put(SqlTypes.DATE.getIdentifier(), StandardSQLTypeName.DATE)
.put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME)
.put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
.put("SqlCharType", StandardSQLTypeName.STRING)
.put("Enum", StandardSQLTypeName.STRING)
.build();
private static final String BIGQUERY_MAP_KEY_FIELD_NAME = "key";
private static final String BIGQUERY_MAP_VALUE_FIELD_NAME = "value";
/**
* Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Beam {@link
* FieldType}.
*/
static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
StandardSQLTypeName ret;
if (fieldType.getTypeName().isLogicalType()) {
ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier());
if (ret == null) {
throw new IllegalArgumentException(
"Cannot convert Beam logical type: "
+ fieldType.getLogicalType().getIdentifier()
+ " to BigQuery type.");
}
} else {
ret = BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
if (ret == null) {
throw new IllegalArgumentException(
"Cannot convert Beam type: " + fieldType.getTypeName() + " to BigQuery type.");
}
}
return ret;
}
/**
* Get the Beam {@link FieldType} from a BigQuery type name.
*
* <p>Supports both standard and legacy SQL types.
*
* @param typeName Name of the type
* @param nestedFields Nested fields for the given type (eg. RECORD type)
* @return Corresponding Beam {@link FieldType}
*/
@Experimental(Kind.SCHEMAS)
private static FieldType fromTableFieldSchemaType(
String typeName, List<TableFieldSchema> nestedFields, SchemaConversionOptions options) {
switch (typeName) {
case "STRING":
return FieldType.STRING;
case "BYTES":
return FieldType.BYTES;
case "INT64":
case "INTEGER":
return FieldType.INT64;
case "FLOAT64":
case "FLOAT":
return FieldType.DOUBLE;
case "BOOL":
case "BOOLEAN":
return FieldType.BOOLEAN;
case "NUMERIC":
return FieldType.DECIMAL;
case "TIMESTAMP":
return FieldType.DATETIME;
case "TIME":
return FieldType.logicalType(SqlTypes.TIME);
case "DATE":
return FieldType.logicalType(SqlTypes.DATE);
case "DATETIME":
return FieldType.logicalType(SqlTypes.DATETIME);
case "STRUCT":
case "RECORD":
if (options.getInferMaps() && nestedFields.size() == 2) {
TableFieldSchema key = nestedFields.get(0);
TableFieldSchema value = nestedFields.get(1);
if (BIGQUERY_MAP_KEY_FIELD_NAME.equals(key.getName())
&& BIGQUERY_MAP_VALUE_FIELD_NAME.equals(value.getName())) {
return FieldType.map(
fromTableFieldSchemaType(key.getType(), key.getFields(), options),
fromTableFieldSchemaType(value.getType(), value.getFields(), options));
}
}
Schema rowSchema = fromTableFieldSchema(nestedFields, options);
return FieldType.row(rowSchema);
default:
throw new UnsupportedOperationException(
"Converting BigQuery type " + typeName + " to Beam type is unsupported");
}
}
private static Schema fromTableFieldSchema(
List<TableFieldSchema> tableFieldSchemas, SchemaConversionOptions options) {
Schema.Builder schemaBuilder = Schema.builder();
for (TableFieldSchema tableFieldSchema : tableFieldSchemas) {
FieldType fieldType =
fromTableFieldSchemaType(
tableFieldSchema.getType(), tableFieldSchema.getFields(), options);
Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf);
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()
&& !fieldType.getTypeName().isMapType()) {
fieldType = FieldType.array(fieldType);
}
// if the mode is not defined or if it is set to NULLABLE, then the field is nullable
boolean nullable =
!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent();
Field field = Field.of(tableFieldSchema.getName(), fieldType).withNullable(nullable);
if (tableFieldSchema.getDescription() != null
&& !"".equals(tableFieldSchema.getDescription())) {
field = field.withDescription(tableFieldSchema.getDescription());
}
schemaBuilder.addField(field);
}
return schemaBuilder.build();
}
private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount());
for (Field schemaField : schema.getFields()) {
FieldType type = schemaField.getType();
TableFieldSchema field = new TableFieldSchema().setName(schemaField.getName());
if (schemaField.getDescription() != null && !"".equals(schemaField.getDescription())) {
field.setDescription(schemaField.getDescription());
}
if (!schemaField.getType().getNullable()) {
field.setMode(Mode.REQUIRED.toString());
}
if (type.getTypeName().isCollectionType()) {
type = type.getCollectionElementType();
if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {
throw new IllegalArgumentException("Array of collection is not supported in BigQuery.");
}
field.setMode(Mode.REPEATED.toString());
}
if (TypeName.ROW == type.getTypeName()) {
Schema subType = type.getRowSchema();
field.setFields(toTableFieldSchema(subType));
}
if (TypeName.MAP == type.getTypeName()) {
Schema mapSchema =
Schema.builder()
.addField(BIGQUERY_MAP_KEY_FIELD_NAME, type.getMapKeyType())
.addField(BIGQUERY_MAP_VALUE_FIELD_NAME, type.getMapValueType())
.build();
type = FieldType.row(mapSchema);
field.setFields(toTableFieldSchema(mapSchema));
field.setMode(Mode.REPEATED.toString());
}
field.setType(toStandardSQLTypeName(type).toString());
fields.add(field);
}
return fields;
}
/** Convert a Beam {@link Schema} to a BigQuery {@link TableSchema}. */
@Experimental(Kind.SCHEMAS)
public static TableSchema toTableSchema(Schema schema) {
return new TableSchema().setFields(toTableFieldSchema(schema));
}
/** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
@Experimental(Kind.SCHEMAS)
public static Schema fromTableSchema(TableSchema tableSchema) {
return fromTableSchema(tableSchema, SchemaConversionOptions.builder().build());
}
/** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
@Experimental(Kind.SCHEMAS)
public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOptions options) {
return fromTableFieldSchema(tableSchema.getFields(), options);
}
/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
@Experimental(Kind.SCHEMAS)
public static org.apache.avro.Schema toGenericAvroSchema(
String schemaName, List<TableFieldSchema> fieldSchemas) {
return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas);
}
private static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow>
TABLE_ROW_TO_BEAM_ROW_FUNCTION = beamSchema -> (TableRow tr) -> toBeamRow(beamSchema, tr);
public static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> tableRowToBeamRow() {
return TABLE_ROW_TO_BEAM_ROW_FUNCTION;
}
private static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow>
TABLE_ROW_FROM_BEAM_ROW_FUNCTION = ignored -> BigQueryUtils::toTableRow;
public static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> tableRowFromBeamRow() {
return TABLE_ROW_FROM_BEAM_ROW_FUNCTION;
}
private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
new ToTableRow<>(SerializableFunctions.identity());
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
public static SerializableFunction<Row, TableRow> toTableRow() {
return ROW_TO_TABLE_ROW;
}
/** Convert a Beam schema type to a BigQuery {@link TableRow}. */
public static <T> SerializableFunction<T, TableRow> toTableRow(
SerializableFunction<T, Row> toRow) {
return new ToTableRow<>(toRow);
}
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
private static class ToTableRow<T> implements SerializableFunction<T, TableRow> {
private final SerializableFunction<T, Row> toRow;
ToTableRow(SerializableFunction<T, Row> toRow) {
this.toRow = toRow;
}
@Override
public TableRow apply(T input) {
return toTableRow(toRow.apply(input));
}
}
@Experimental(Kind.SCHEMAS)
public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptions options) {
List<Object> valuesInOrder =
schema.getFields().stream()
.map(
field -> {
try {
return convertAvroFormat(field.getType(), record.get(field.getName()), options);
} catch (Exception cause) {
throw new IllegalArgumentException(
"Error converting field " + field + ": " + cause.getMessage(), cause);
}
})
.collect(toList());
return Row.withSchema(schema).addValues(valuesInOrder).build();
}
public static TableRow convertGenericRecordToTableRow(
GenericRecord record, TableSchema tableSchema) {
return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
}
/** Convert a BigQuery TableRow to a Beam Row. */
public static TableRow toTableRow(Row row) {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
Object value = row.getValue(i);
Field schemaField = row.getSchema().getField(i);
output = output.set(schemaField.getName(), fromBeamField(schemaField.getType(), value));
}
return output;
}
private static @Nullable Object fromBeamField(FieldType fieldType, Object fieldValue) {
if (fieldValue == null) {
if (!fieldType.getNullable()) {
throw new IllegalArgumentException("Field is not nullable.");
}
return null;
}
switch (fieldType.getTypeName()) {
case ARRAY:
case ITERABLE:
FieldType elementType = fieldType.getCollectionElementType();
Iterable<?> items = (Iterable<?>) fieldValue;
List<Object> convertedItems = Lists.newArrayListWithCapacity(Iterables.size(items));
for (Object item : items) {
convertedItems.add(fromBeamField(elementType, item));
}
return convertedItems;
case MAP:
FieldType keyElementType = fieldType.getMapKeyType();
FieldType valueElementType = fieldType.getMapValueType();
Map<?, ?> pairs = (Map<?, ?>) fieldValue;
convertedItems = Lists.newArrayListWithCapacity(pairs.size());
for (Map.Entry<?, ?> pair : pairs.entrySet()) {
convertedItems.add(
new TableRow()
.set(BIGQUERY_MAP_KEY_FIELD_NAME, fromBeamField(keyElementType, pair.getKey()))
.set(
BIGQUERY_MAP_VALUE_FIELD_NAME,
fromBeamField(valueElementType, pair.getValue())));
}
return convertedItems;
case ROW:
return toTableRow((Row) fieldValue);
case DATETIME:
return ((Instant) fieldValue)
.toDateTime(DateTimeZone.UTC)
.toString(BIGQUERY_TIMESTAMP_PRINTER);
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case STRING:
case BOOLEAN:
case DECIMAL:
return fieldValue.toString();
case BYTES:
return BaseEncoding.base64().encode((byte[]) fieldValue);
case LOGICAL_TYPE:
// For the JSON formats of DATE/DATETIME/TIME/TIMESTAMP types that BigQuery accepts, see
// https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#details_of_loading_json_data
String identifier = fieldType.getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
return fieldValue.toString();
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
// LocalTime.toString() drops seconds if it is zero (see
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalTime.html#toString--).
// but BigQuery TIME requires seconds
// (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type).
// Fractional seconds are optional so drop them to conserve number of bytes transferred.
LocalTime localTime = (LocalTime) fieldValue;
@SuppressWarnings(
"JavaLocalTimeGetNano") // Suppression is justified because seconds are always
// outputted.
java.time.format.DateTimeFormatter localTimeFormatter =
(0 == localTime.getNano()) ? ISO_LOCAL_TIME : BIGQUERY_TIME_FORMATTER;
return localTimeFormatter.format(localTime);
} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
// Same rationale as SqlTypes.TIME
LocalDateTime localDateTime = (LocalDateTime) fieldValue;
@SuppressWarnings("JavaLocalDateTimeGetNano")
java.time.format.DateTimeFormatter localDateTimeFormatter =
(0 == localDateTime.getNano()) ? ISO_LOCAL_DATE_TIME : BIGQUERY_DATETIME_FORMATTER;
return localDateTimeFormatter.format(localDateTime);
} else if ("Enum".equals(identifier)) {
return fieldType
.getLogicalType(EnumerationType.class)
.toString((EnumerationType.Value) fieldValue);
} // fall through
default:
return fieldValue.toString();
}
}
/**
* Tries to convert a JSON {@link TableRow} from BigQuery into a Beam {@link Row}.
*
* <p>Only supports basic types and arrays. Doesn't support date types or structs.
*/
@Experimental(Kind.SCHEMAS)
public static Row toBeamRow(Schema rowSchema, TableRow jsonBqRow) {
// TODO deprecate toBeamRow(Schema, TableSchema, TableRow) function in favour of this function.
// This function attempts to convert TableRows without having access to the
// corresponding TableSchema because:
// 1. TableSchema contains redundant information already available in the Schema object.
// 2. TableSchema objects are not serializable and are therefore harder to propagate through a
// pipeline.
return rowSchema.getFields().stream()
.map(field -> toBeamRowFieldValue(field, jsonBqRow.get(field.getName())))
.collect(toRow(rowSchema));
}
private static Object toBeamRowFieldValue(Field field, Object bqValue) {
if (bqValue == null) {
if (field.getType().getNullable()) {
return null;
} else {
throw new IllegalArgumentException(
"Received null value for non-nullable field " + field.getName());
}
}
return toBeamValue(field.getType(), bqValue);
}
/**
* Tries to parse the JSON {@link TableRow} from BigQuery.
*
* <p>Only supports basic types and arrays. Doesn't support date types.
*/
@Experimental(Kind.SCHEMAS)
public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jsonBqRow) {
List<TableFieldSchema> bqFields = bqSchema.getFields();
Map<String, Integer> bqFieldIndices =
IntStream.range(0, bqFields.size())
.boxed()
.collect(toMap(i -> bqFields.get(i).getName(), i -> i));
List<Object> rawJsonValues =
rowSchema.getFields().stream()
.map(field -> bqFieldIndices.get(field.getName()))
.map(index -> jsonBqRow.getF().get(index).getV())
.collect(toList());
return IntStream.range(0, rowSchema.getFieldCount())
.boxed()
.map(index -> toBeamValue(rowSchema.getField(index).getType(), rawJsonValues.get(index)))
.collect(toRow(rowSchema));
}
private static Object toBeamValue(FieldType fieldType, Object jsonBQValue) {
if (jsonBQValue instanceof String) {
String jsonBQString = (String) jsonBQValue;
if (JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
} else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(jsonBQString);
} else if (fieldType.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return LocalTime.parse(jsonBQString);
}
}
if (jsonBQValue instanceof List) {
return ((List<Object>) jsonBQValue)
.stream()
.map(v -> ((Map<String, Object>) v).get("v"))
.map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
.collect(toList());
}
if (jsonBQValue instanceof Map) {
TableRow tr = new TableRow();
tr.putAll((Map<String, Object>) jsonBQValue);
return toBeamRow(fieldType.getRowSchema(), tr);
}
throw new UnsupportedOperationException(
"Converting BigQuery type '"
+ jsonBQValue.getClass()
+ "' to '"
+ fieldType
+ "' is not supported");
}
// TODO: BigQuery shouldn't know about SQL internal logical types.
private static final Set<String> SQL_DATE_TIME_TYPES = ImmutableSet.of("SqlTimeWithLocalTzType");
private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType");
/**
* Tries to convert an Avro decoded value to a Beam field value based on the target type of the
* Beam field.
*
* <p>For the Avro formats of BigQuery types, see
* https://cloud.google.com/bigquery/docs/exporting-data#avro_export_details and
* https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions
*/
public static Object convertAvroFormat(
FieldType beamFieldType, Object avroValue, BigQueryUtils.ConversionOptions options) {
TypeName beamFieldTypeName = beamFieldType.getTypeName();
if (avroValue == null) {
if (beamFieldType.getNullable()) {
return null;
} else {
throw new IllegalArgumentException(String.format("Field %s not nullable", beamFieldType));
}
}
switch (beamFieldTypeName) {
case BYTE:
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case STRING:
case BYTES:
case BOOLEAN:
return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue);
case DATETIME:
// Expecting value in microseconds.
switch (options.getTruncateTimestamps()) {
case TRUNCATE:
return truncateToMillis(avroValue);
case REJECT:
return safeToMillis(avroValue);
default:
throw new IllegalArgumentException(
String.format(
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
}
case DECIMAL:
return convertAvroNumeric(avroValue);
case ARRAY:
return convertAvroArray(beamFieldType, avroValue, options);
case LOGICAL_TYPE:
String identifier = beamFieldType.getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
return convertAvroDate(avroValue);
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
return convertAvroTime(avroValue);
} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
return convertAvroDateTime(avroValue);
} else if (SQL_DATE_TIME_TYPES.contains(identifier)) {
switch (options.getTruncateTimestamps()) {
case TRUNCATE:
return truncateToMillis(avroValue);
case REJECT:
return safeToMillis(avroValue);
default:
throw new IllegalArgumentException(
String.format(
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
}
} else if (SQL_STRING_TYPES.contains(identifier)) {
return convertAvroPrimitiveTypes(TypeName.STRING, avroValue);
} else {
throw new RuntimeException("Unknown logical type " + identifier);
}
case ROW:
Schema rowSchema = beamFieldType.getRowSchema();
if (rowSchema == null) {
throw new IllegalArgumentException("Nested ROW missing row schema");
}
GenericData.Record record = (GenericData.Record) avroValue;
return toBeamRow(record, rowSchema, options);
case MAP:
return convertAvroRecordToMap(beamFieldType, avroValue, options);
default:
throw new RuntimeException(
"Does not support converting unknown type value: " + beamFieldTypeName);
}
}
private static ReadableInstant safeToMillis(Object value) {
long subMilliPrecision = ((long) value) % 1000;
if (subMilliPrecision != 0) {
throw new IllegalArgumentException(
String.format(
"BigQuery data contained value %s with sub-millisecond precision, which Beam does"
+ " not currently support."
+ " You can enable truncating timestamps to millisecond precision"
+ " by using BigQueryIO.withTruncatedTimestamps",
value));
} else {
return truncateToMillis(value);
}
}
private static ReadableInstant truncateToMillis(Object value) {
return new Instant((long) value / 1000);
}
private static Object convertAvroArray(
FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) {
// Check whether the type of array element is equal.
List<Object> values = (List<Object>) value;
List<Object> ret = new ArrayList<>();
FieldType collectionElement = beamField.getCollectionElementType();
for (Object v : values) {
ret.add(convertAvroFormat(collectionElement, v, options));
}
return ret;
}
private static Object convertAvroRecordToMap(
FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) {
List<GenericData.Record> records = (List<GenericData.Record>) value;
ImmutableMap.Builder<Object, Object> ret = ImmutableMap.builder();
FieldType keyElement = beamField.getMapKeyType();
FieldType valueElement = beamField.getMapValueType();
for (GenericData.Record record : records) {
ret.put(
convertAvroFormat(keyElement, record.get(0), options),
convertAvroFormat(valueElement, record.get(1), options));
}
return ret.build();
}
private static Object convertAvroPrimitiveTypes(TypeName beamType, Object value) {
switch (beamType) {
case BYTE:
return ((Long) value).byteValue();
case INT16:
return ((Long) value).shortValue();
case INT32:
return ((Long) value).intValue();
case INT64:
return value; // Long
case FLOAT:
return ((Double) value).floatValue();
case DOUBLE:
return value; // Double
case BOOLEAN:
return value; // Boolean
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type value");
case STRING:
return convertAvroString(value);
case BYTES:
return convertAvroBytes(value);
default:
throw new RuntimeException(beamType + " is not primitive type.");
}
}
private static Object convertAvroString(Object value) {
if (value == null) {
return null;
} else if (value instanceof Utf8) {
return ((Utf8) value).toString();
} else if (value instanceof String) {
return value;
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
private static Object convertAvroBytes(Object value) {
if (value == null) {
return null;
} else if (value instanceof ByteBuffer) {
ByteBuffer bf = (ByteBuffer) value;
byte[] result = new byte[bf.limit()];
bf.get(result);
return result;
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
private static Object convertAvroDate(Object value) {
if (value == null) {
return null;
} else if (value instanceof Integer) {
return LocalDate.ofEpochDay((Integer) value);
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
private static Object convertAvroTime(Object value) {
if (value == null) {
return null;
} else if (value instanceof Long) {
return LocalTime.ofNanoOfDay((Long) value * 1000);
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
private static Object convertAvroDateTime(Object value) {
if (value == null) {
return null;
} else if (value instanceof Utf8) {
return LocalDateTime.parse(value.toString());
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
private static Object convertAvroNumeric(Object value) {
if (value == null) {
return null;
} else if (value instanceof ByteBuffer) {
// BigQuery NUMERIC type has precision 38 and scale 9
return new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) value, null, LogicalTypes.decimal(38, 9));
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
/**
* @param fullTableId - Is one of the two forms commonly used to refer to bigquery tables in the
* beam codebase:
* <ul>
* <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
* <li>myproject:mydataset.mytable
* <li>myproject.mydataset.mytable
* </ul>
*
* @return a BigQueryTableIdentifier by parsing the fullTableId. If it cannot be parsed properly
* null is returned.
*/
public static @Nullable TableReference toTableReference(String fullTableId) {
// Try parsing the format:
// "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
if (m.matches()) {
return new TableReference()
.setProjectId(m.group("PROJECT"))
.setDatasetId(m.group("DATASET"))
.setTableId(m.group("TABLE"));
}
// If that failed, try the format:
// "{project_id}:{dataset_id}.{table_id}" or
// "{project_id}.{dataset_id}.{table_id}"
m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
if (m.matches()) {
return new TableReference()
.setProjectId(m.group("PROJECT"))
.setDatasetId(m.group("DATASET"))
.setTableId(m.group("TABLE"));
}
return null;
}
private static ServiceCallMetric callMetricForMethod(
TableReference tableReference, String method) {
if (tableReference != null) {
// TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
// SpecMonitoringInfoValidator from dropping the MonitoringInfo.
HashMap<String, String> baseLabels = new HashMap<String, String>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
baseLabels.put(MonitoringInfoConstants.Labels.METHOD, method);
baseLabels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.bigQueryTable(
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId()));
baseLabels.put(
MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, tableReference.getProjectId());
baseLabels.put(
MonitoringInfoConstants.Labels.BIGQUERY_DATASET, tableReference.getDatasetId());
baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, tableReference.getTableId());
return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
}
return null;
}
/**
* @param tableReference - The table being read from. Can be a temporary BQ table used to read
* from a SQL query.
* @return a ServiceCallMetric for recording statuses for all BQ API responses related to reading
* elements directly from BigQuery in a process-wide metric. Such as: calls to readRows,
* splitReadStream, createReadSession.
*/
public static ServiceCallMetric readCallMetric(TableReference tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchRead");
}
/**
* @param tableReference - The table being written to.
* @return a ServiceCallMetric for recording statuses for all BQ responses related to writing
* elements directly to BigQuery in a process-wide metric. Such as: insertAll.
*/
public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
}
}