blob: 52fd54dc557f3555f2a2d649d76b86d734a6d7c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify.verify;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify.verifyNotNull;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeFormatterBuilder;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
* A set of utilities for working with Avro files.
*
* <p>These utilities are based on the <a href="https://avro.apache.org/docs/1.8.1/spec.html">Avro
* 1.8.1</a> specification.
*/
class BigQueryAvroUtils {
/**
* Defines the valid mapping between BigQuery types and native Avro types.
*
* <p>Some BigQuery types are duplicated here since slightly different Avro records are produced
* when exporting data in Avro format and when reading data directly using the read API.
*/
public static final ImmutableMultimap<String, Type> BIG_QUERY_TO_AVRO_TYPES =
ImmutableMultimap.<String, Type>builder()
.put("STRING", Type.STRING)
.put("GEOGRAPHY", Type.STRING)
.put("BYTES", Type.BYTES)
.put("INTEGER", Type.LONG)
.put("FLOAT", Type.DOUBLE)
.put("NUMERIC", Type.BYTES)
.put("BOOLEAN", Type.BOOLEAN)
.put("TIMESTAMP", Type.LONG)
.put("RECORD", Type.RECORD)
.put("DATE", Type.STRING)
.put("DATE", Type.INT)
.put("DATETIME", Type.STRING)
.put("TIME", Type.STRING)
.put("TIME", Type.LONG)
.build();
/**
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
* immutable.
*/
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
long timestampSec = timestampMicro / 1_000_000;
long micros = timestampMicro % 1_000_000;
if (micros < 0) {
micros += 1_000_000;
timestampSec -= 1;
}
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
if (micros == 0) {
return String.format("%s UTC", dayAndTime);
}
return String.format("%s.%06d UTC", dayAndTime, micros);
}
/**
* This method formats a BigQuery DATE value into a String matching the format used by JSON
* export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic
* Gregorian calendar.
*/
private static String formatDate(int date) {
return LocalDate.ofEpochDay(date).format(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE);
}
private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MICROS =
new DateTimeFormatterBuilder()
.appendValue(HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(MINUTE_OF_HOUR, 2)
.appendLiteral(':')
.appendValue(SECOND_OF_MINUTE, 2)
.appendLiteral('.')
.appendFraction(NANO_OF_SECOND, 6, 6, false)
.toFormatter();
private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MILLIS =
new DateTimeFormatterBuilder()
.appendValue(HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(MINUTE_OF_HOUR, 2)
.appendLiteral(':')
.appendValue(SECOND_OF_MINUTE, 2)
.appendLiteral('.')
.appendFraction(NANO_OF_SECOND, 3, 3, false)
.toFormatter();
private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_SECONDS =
new DateTimeFormatterBuilder()
.appendValue(HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(MINUTE_OF_HOUR, 2)
.appendLiteral(':')
.appendValue(SECOND_OF_MINUTE, 2)
.toFormatter();
/**
* This method formats a BigQuery TIME value into a String matching the format used by JSON
* export. Time records are stored in "microseconds since midnight" format.
*/
private static String formatTime(long timeMicros) {
java.time.format.DateTimeFormatter formatter;
if (timeMicros % 1000000 == 0) {
formatter = ISO_LOCAL_TIME_FORMATTER_SECONDS;
} else if (timeMicros % 1000 == 0) {
formatter = ISO_LOCAL_TIME_FORMATTER_MILLIS;
} else {
formatter = ISO_LOCAL_TIME_FORMATTER_MICROS;
}
return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter);
}
/**
* Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
*
* <p>See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">"Avro
* format"</a> for more information.
*/
static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
return convertGenericRecordToTableRow(record, schema.getFields());
}
private static TableRow convertGenericRecordToTableRow(
GenericRecord record, List<TableFieldSchema> fields) {
TableRow row = new TableRow();
for (TableFieldSchema subSchema : fields) {
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
// is required, so it may not be null.
Field field = record.getSchema().getField(subSchema.getName());
Object convertedValue =
getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
if (convertedValue != null) {
// To match the JSON files exported by BigQuery, do not include null values in the output.
row.set(field.name(), convertedValue);
}
}
return row;
}
@Nullable
private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
// is optional (and so it may be null), but defaults to "NULLABLE".
String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
switch (mode) {
case "REQUIRED":
return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v);
case "REPEATED":
return convertRepeatedField(schema, fieldSchema, v);
case "NULLABLE":
return convertNullableField(schema, fieldSchema, v);
default:
throw new UnsupportedOperationException(
"Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
}
}
private static List<Object> convertRepeatedField(
Schema schema, TableFieldSchema fieldSchema, Object v) {
Type arrayType = schema.getType();
verify(
arrayType == Type.ARRAY,
"BigQuery REPEATED field %s should be Avro ARRAY, not %s",
fieldSchema.getName(),
arrayType);
// REPEATED fields are represented as Avro arrays.
if (v == null) {
// Handle the case of an empty repeated field.
return new ArrayList<>();
}
@SuppressWarnings("unchecked")
List<Object> elements = (List<Object>) v;
ArrayList<Object> values = new ArrayList<>();
Type elementType = schema.getElementType().getType();
LogicalType elementLogicalType = schema.getElementType().getLogicalType();
for (Object element : elements) {
values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element));
}
return values;
}
private static Object convertRequiredField(
Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
// is required, so it may not be null.
String bqType = fieldSchema.getType();
ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
verify(
expectedAvroTypes.contains(avroType),
"Expected Avro schema types %s for BigQuery %s field %s, but received %s",
expectedAvroTypes,
bqType,
fieldSchema.getName(),
avroType);
// For historical reasons, don't validate avroLogicalType except for with NUMERIC.
// BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.
switch (bqType) {
case "STRING":
case "DATETIME":
case "GEOGRAPHY":
// Avro will use a CharSequence to represent String objects, but it may not always use
// java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
case "DATE":
if (avroType == Type.INT) {
verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected Date logical type");
verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type");
return formatDate((Integer) v);
} else {
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
}
case "TIME":
if (avroType == Type.LONG) {
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
verify(
avroLogicalType instanceof LogicalTypes.TimeMicros,
"Expected TimeMicros logical type");
return formatTime((Long) v);
} else {
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
}
case "INTEGER":
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
return ((Long) v).toString();
case "FLOAT":
verify(v instanceof Double, "Expected Double, got %s", v.getClass());
return v;
case "NUMERIC":
// NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are
// converted back to Strings with precision and scale determined by the logical type.
verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected Decimal logical type");
verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);
return numericValue.toString();
case "BOOLEAN":
verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
return v;
case "TIMESTAMP":
// TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
// Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
return formatTimestamp((Long) v);
case "RECORD":
verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
case "BYTES":
verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
ByteBuffer byteBuffer = (ByteBuffer) v;
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
return BaseEncoding.base64().encode(bytes);
default:
throw new UnsupportedOperationException(
String.format(
"Unexpected BigQuery field schema type %s for field named %s",
fieldSchema.getType(), fieldSchema.getName()));
}
}
@Nullable
private static Object convertNullableField(
Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
// NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
verify(
avroSchema.getType() == Type.UNION,
"Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
avroSchema.getType(),
fieldSchema.getName());
List<Schema> unionTypes = avroSchema.getTypes();
verify(
unionTypes.size() == 2,
"BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
fieldSchema.getName(),
unionTypes);
if (v == null) {
return null;
}
Type firstType = unionTypes.get(0).getType();
if (!firstType.equals(Type.NULL)) {
return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v);
}
return convertRequiredField(
unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v);
}
static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> fieldSchemas) {
List<Field> avroFields = new ArrayList<>();
for (TableFieldSchema bigQueryField : fieldSchemas) {
avroFields.add(convertField(bigQueryField));
}
return Schema.createRecord(
schemaName,
"org.apache.beam.sdk.io.gcp.bigquery",
"Translated Avro Schema for " + schemaName,
false,
avroFields);
}
private static Field convertField(TableFieldSchema bigQueryField) {
Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()).iterator().next();
Schema elementSchema;
if (avroType == Type.RECORD) {
elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields());
} else {
elementSchema = Schema.create(avroType);
}
Schema fieldSchema;
if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) {
fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
} else if ("REQUIRED".equals(bigQueryField.getMode())) {
fieldSchema = elementSchema;
} else if ("REPEATED".equals(bigQueryField.getMode())) {
fieldSchema = Schema.createArray(elementSchema);
} else {
throw new IllegalArgumentException(
String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));
}
return new Field(
bigQueryField.getName(),
fieldSchema,
bigQueryField.getDescription(),
(Object) null /* Cast to avoid deprecated JsonNode constructor. */);
}
}