blob: bd1fda34ca325c6136372165a8e64f12e41f5672 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.values.Row.toRow;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
/** Utility methods for BigQuery related operations. */
public class BigQueryUtils {
/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
/**
* Controls whether to truncate timestamps to millisecond precision lossily, or to crash when
* truncation would result.
*/
public enum TruncateTimestamps {
/** Reject timestamps with greater-than-millisecond precision. */
REJECT,
/** Truncate timestamps to millisecond precision. */
TRUNCATE;
}
public abstract TruncateTimestamps getTruncateTimestamps();
public static Builder builder() {
return new AutoValue_BigQueryUtils_ConversionOptions.Builder()
.setTruncateTimestamps(TruncateTimestamps.REJECT);
}
/** Builder for {@link ConversionOptions}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTruncateTimestamps(TruncateTimestamps truncateTimestamps);
public abstract ConversionOptions build();
}
}
private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING =
ImmutableMap.<TypeName, StandardSQLTypeName>builder()
.put(TypeName.BYTE, StandardSQLTypeName.INT64)
.put(TypeName.INT16, StandardSQLTypeName.INT64)
.put(TypeName.INT32, StandardSQLTypeName.INT64)
.put(TypeName.INT64, StandardSQLTypeName.INT64)
.put(TypeName.FLOAT, StandardSQLTypeName.FLOAT64)
.put(TypeName.DOUBLE, StandardSQLTypeName.FLOAT64)
.put(TypeName.DECIMAL, StandardSQLTypeName.NUMERIC)
.put(TypeName.BOOLEAN, StandardSQLTypeName.BOOL)
.put(TypeName.ARRAY, StandardSQLTypeName.ARRAY)
.put(TypeName.ROW, StandardSQLTypeName.STRUCT)
.put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)
.put(TypeName.STRING, StandardSQLTypeName.STRING)
.put(TypeName.BYTES, StandardSQLTypeName.BYTES)
.build();
private static final Map<TypeName, Function<String, Object>> JSON_VALUE_PARSERS =
ImmutableMap.<TypeName, Function<String, Object>>builder()
.put(TypeName.BYTE, Byte::valueOf)
.put(TypeName.INT16, Short::valueOf)
.put(TypeName.INT32, Integer::valueOf)
.put(TypeName.INT64, Long::valueOf)
.put(TypeName.FLOAT, Float::valueOf)
.put(TypeName.DOUBLE, Double::valueOf)
.put(TypeName.DECIMAL, BigDecimal::new)
.put(TypeName.BOOLEAN, Boolean::valueOf)
.put(TypeName.STRING, str -> str)
.put(
TypeName.DATETIME,
str ->
new DateTime(
(long) (Double.parseDouble(str) * 1000), ISOChronology.getInstanceUTC()))
.build();
// TODO: BigQuery code should not be relying on Calcite metadata fields. If so, this belongs
// in the SQL package.
private static final Map<String, StandardSQLTypeName> BEAM_TO_BIGQUERY_LOGICAL_MAPPING =
ImmutableMap.<String, StandardSQLTypeName>builder()
.put("SqlDateType", StandardSQLTypeName.DATE)
.put("SqlTimeType", StandardSQLTypeName.TIME)
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
.put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP)
.put("SqlCharType", StandardSQLTypeName.STRING)
.build();
/**
* Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Beam {@link
* FieldType}.
*/
private static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
if (fieldType.getTypeName().isLogicalType()) {
StandardSQLTypeName foundType =
BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier());
if (foundType != null) {
return foundType;
}
}
return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
}
private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount());
for (Field schemaField : schema.getFields()) {
FieldType type = schemaField.getType();
TableFieldSchema field = new TableFieldSchema().setName(schemaField.getName());
if (schemaField.getDescription() != null && !"".equals(schemaField.getDescription())) {
field.setDescription(schemaField.getDescription());
}
if (!schemaField.getType().getNullable()) {
field.setMode(Mode.REQUIRED.toString());
}
if (TypeName.ARRAY == type.getTypeName()) {
type = type.getCollectionElementType();
if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {
throw new IllegalArgumentException("Array of collection is not supported in BigQuery.");
}
field.setMode(Mode.REPEATED.toString());
}
if (TypeName.ROW == type.getTypeName()) {
Schema subType = type.getRowSchema();
field.setFields(toTableFieldSchema(subType));
}
if (TypeName.MAP == type.getTypeName()) {
throw new IllegalArgumentException("Maps are not supported in BigQuery.");
}
field.setType(toStandardSQLTypeName(type).toString());
fields.add(field);
}
return fields;
}
/** Convert a Beam {@link Schema} to a BigQuery {@link TableSchema}. */
public static TableSchema toTableSchema(Schema schema) {
return new TableSchema().setFields(toTableFieldSchema(schema));
}
private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
new ToTableRow(SerializableFunctions.identity());
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
public static SerializableFunction<Row, TableRow> toTableRow() {
return ROW_TO_TABLE_ROW;
}
/** Convert a Beam schema type to a BigQuery {@link TableRow}. */
public static <T> SerializableFunction<T, TableRow> toTableRow(
SerializableFunction<T, Row> toRow) {
return new ToTableRow<>(toRow);
}
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
private static class ToTableRow<T> implements SerializableFunction<T, TableRow> {
private final SerializableFunction<T, Row> toRow;
ToTableRow(SerializableFunction<T, Row> toRow) {
this.toRow = toRow;
}
@Override
public TableRow apply(T input) {
return toTableRow(toRow.apply(input));
}
}
public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptions options) {
List<Object> valuesInOrder =
schema.getFields().stream()
.map(field -> convertAvroFormat(field, record.get(field.getName()), options))
.collect(toList());
return Row.withSchema(schema).addValues(valuesInOrder).build();
}
/** Convert a BigQuery TableRow to a Beam Row. */
public static TableRow toTableRow(Row row) {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
Object value = row.getValue(i);
Field schemaField = row.getSchema().getField(i);
output = output.set(schemaField.getName(), fromBeamField(schemaField.getType(), value));
}
return output;
}
private static Object fromBeamField(FieldType fieldType, Object fieldValue) {
if (fieldValue == null) {
if (!fieldType.getNullable()) {
throw new IllegalArgumentException("Field is not nullable.");
}
return null;
}
switch (fieldType.getTypeName()) {
case ARRAY:
FieldType elementType = fieldType.getCollectionElementType();
List items = (List) fieldValue;
List convertedItems = Lists.newArrayListWithCapacity(items.size());
for (Object item : items) {
convertedItems.add(fromBeamField(elementType, item));
}
return convertedItems;
case ROW:
return toTableRow((Row) fieldValue);
case DATETIME:
DateTimeFormatter patternFormat =
new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
.toFormatter();
return ((Instant) fieldValue).toDateTime().toString(patternFormat);
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case STRING:
case BOOLEAN:
return fieldValue.toString();
case DECIMAL:
return fieldValue.toString();
case BYTES:
ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
return BaseEncoding.base64().encode(bytes);
default:
return fieldValue;
}
}
/**
* Tries to parse the JSON {@link TableRow} from BigQuery.
*
* <p>Only supports basic types and arrays. Doesn't support date types.
*/
public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jsonBqRow) {
List<TableFieldSchema> bqFields = bqSchema.getFields();
Map<String, Integer> bqFieldIndices =
IntStream.range(0, bqFields.size())
.boxed()
.collect(toMap(i -> bqFields.get(i).getName(), i -> i));
List<Object> rawJsonValues =
rowSchema.getFields().stream()
.map(field -> bqFieldIndices.get(field.getName()))
.map(index -> jsonBqRow.getF().get(index).getV())
.collect(toList());
return IntStream.range(0, rowSchema.getFieldCount())
.boxed()
.map(index -> toBeamValue(rowSchema.getField(index).getType(), rawJsonValues.get(index)))
.collect(toRow(rowSchema));
}
private static Object toBeamValue(FieldType fieldType, Object jsonBQValue) {
if (jsonBQValue instanceof String && JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {
return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply((String) jsonBQValue);
}
if (jsonBQValue instanceof List) {
return ((List<Object>) jsonBQValue)
.stream()
.map(v -> ((Map<String, Object>) v).get("v"))
.map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
.collect(toList());
}
throw new UnsupportedOperationException(
"Converting BigQuery type '"
+ jsonBQValue.getClass()
+ "' to '"
+ fieldType
+ "' is not supported");
}
// TODO: BigQuery shouldn't know about SQL internal logical types.
private static final Set<String> SQL_DATE_TIME_TYPES =
ImmutableSet.of(
"SqlDateType", "SqlTimeType", "SqlTimeWithLocalTzType", "SqlTimestampWithLocalTzType");
private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType");
/**
* Tries to convert an Avro decoded value to a Beam field value based on the target type of the
* Beam field.
*/
public static Object convertAvroFormat(
Field beamField, Object avroValue, BigQueryUtils.ConversionOptions options) {
TypeName beamFieldTypeName = beamField.getType().getTypeName();
switch (beamFieldTypeName) {
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case BYTE:
case BOOLEAN:
return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue);
case DATETIME:
// Expecting value in microseconds.
switch (options.getTruncateTimestamps()) {
case TRUNCATE:
return truncateToMillis(avroValue);
case REJECT:
return safeToMillis(avroValue);
default:
throw new IllegalArgumentException(
String.format(
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
}
case STRING:
return convertAvroPrimitiveTypes(beamFieldTypeName, avroValue);
case ARRAY:
return convertAvroArray(beamField, avroValue);
case LOGICAL_TYPE:
String identifier = beamField.getType().getLogicalType().getIdentifier();
if (SQL_DATE_TIME_TYPES.contains(identifier)) {
switch (options.getTruncateTimestamps()) {
case TRUNCATE:
return truncateToMillis(avroValue);
case REJECT:
return safeToMillis(avroValue);
default:
throw new IllegalArgumentException(
String.format(
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
}
} else if (SQL_STRING_TYPES.contains(identifier)) {
return convertAvroPrimitiveTypes(TypeName.STRING, avroValue);
} else {
throw new RuntimeException("Unknown logical type " + identifier);
}
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type value");
case MAP:
throw new RuntimeException("Does not support converting MAP type value");
default:
throw new RuntimeException(
"Does not support converting unknown type value: " + beamFieldTypeName);
}
}
private static ReadableInstant safeToMillis(Object value) {
long subMilliPrecision = ((long) value) % 1000;
if (subMilliPrecision != 0) {
throw new IllegalArgumentException(
String.format(
"BigQuery data contained value %s with sub-millisecond precision, which Beam does"
+ " not currently support."
+ " You can enable truncating timestamps to millisecond precision"
+ " by using BigQueryIO.withTruncatedTimestamps",
value));
} else {
return truncateToMillis(value);
}
}
private static ReadableInstant truncateToMillis(Object value) {
return new Instant((long) value / 1000);
}
private static Object convertAvroArray(Field beamField, Object value) {
// Check whether the type of array element is equal.
List<Object> values = (List<Object>) value;
List<Object> ret = new ArrayList();
for (Object v : values) {
ret.add(
convertAvroPrimitiveTypes(
beamField.getType().getCollectionElementType().getTypeName(), v));
}
return (Object) ret;
}
private static Object convertAvroString(Object value) {
if (value == null) {
return null;
} else if (value instanceof org.apache.avro.util.Utf8) {
return ((org.apache.avro.util.Utf8) value).toString();
} else if (value instanceof String) {
return value;
} else {
throw new RuntimeException(
"Does not support converting avro format: " + value.getClass().getName());
}
}
private static Object convertAvroPrimitiveTypes(TypeName beamType, Object value) {
switch (beamType) {
case BYTE:
return ((Long) value).byteValue();
case INT16:
return ((Long) value).shortValue();
case INT32:
return ((Long) value).intValue();
case INT64:
return value;
case FLOAT:
return ((Double) value).floatValue();
case DOUBLE:
return (Double) value;
case BOOLEAN:
return (Boolean) value;
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type value");
case STRING:
return convertAvroString(value);
default:
throw new RuntimeException(beamType + " is not primitive type.");
}
}
}