blob: 0782622e1d6f6df96ab7d128964e5709e235420b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableSchema;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link BigQueryUtils}. */
@RunWith(JUnit4.class)
public class BigQueryUtilsTest {
private static final Schema FLAT_TYPE =
Schema.builder()
.addNullableField("id", Schema.FieldType.INT64)
.addNullableField("value", Schema.FieldType.DOUBLE)
.addNullableField("name", Schema.FieldType.STRING)
.addNullableField("timestamp_variant1", Schema.FieldType.DATETIME)
.addNullableField("timestamp_variant2", Schema.FieldType.DATETIME)
.addNullableField("timestamp_variant3", Schema.FieldType.DATETIME)
.addNullableField("timestamp_variant4", Schema.FieldType.DATETIME)
.addNullableField("datetime", Schema.FieldType.logicalType(SqlTypes.DATETIME))
.addNullableField("datetime0ms", Schema.FieldType.logicalType(SqlTypes.DATETIME))
.addNullableField("datetime0s_ns", Schema.FieldType.logicalType(SqlTypes.DATETIME))
.addNullableField("datetime0s_0ns", Schema.FieldType.logicalType(SqlTypes.DATETIME))
.addNullableField("date", Schema.FieldType.logicalType(SqlTypes.DATE))
.addNullableField("time", Schema.FieldType.logicalType(SqlTypes.TIME))
.addNullableField("time0ms", Schema.FieldType.logicalType(SqlTypes.TIME))
.addNullableField("time0s_ns", Schema.FieldType.logicalType(SqlTypes.TIME))
.addNullableField("time0s_0ns", Schema.FieldType.logicalType(SqlTypes.TIME))
.addNullableField("valid", Schema.FieldType.BOOLEAN)
.addNullableField("binary", Schema.FieldType.BYTES)
.addNullableField("numeric", Schema.FieldType.DECIMAL)
.build();
private static final Schema ENUM_TYPE =
Schema.builder()
.addNullableField(
"color", Schema.FieldType.logicalType(EnumerationType.create("RED", "GREEN", "BLUE")))
.build();
private static final Schema ENUM_STRING_TYPE =
Schema.builder().addNullableField("color", Schema.FieldType.STRING).build();
private static final Schema MAP_TYPE =
Schema.builder().addStringField("key").addDoubleField("value").build();
private static final Schema ARRAY_TYPE =
Schema.builder().addArrayField("ids", Schema.FieldType.INT64).build();
private static final Schema ROW_TYPE =
Schema.builder().addNullableField("row", Schema.FieldType.row(FLAT_TYPE)).build();
private static final Schema ARRAY_ROW_TYPE =
Schema.builder().addArrayField("rows", Schema.FieldType.row(FLAT_TYPE)).build();
private static final Schema MAP_ARRAY_TYPE =
Schema.builder().addArrayField("map", Schema.FieldType.row(MAP_TYPE)).build();
private static final Schema MAP_MAP_TYPE =
Schema.builder().addMapField("map", Schema.FieldType.STRING, Schema.FieldType.DOUBLE).build();
private static final TableFieldSchema ID =
new TableFieldSchema().setName("id").setType(StandardSQLTypeName.INT64.toString());
private static final TableFieldSchema VALUE =
new TableFieldSchema().setName("value").setType(StandardSQLTypeName.FLOAT64.toString());
private static final TableFieldSchema NAME =
new TableFieldSchema().setName("name").setType(StandardSQLTypeName.STRING.toString());
private static final TableFieldSchema TIMESTAMP_VARIANT1 =
new TableFieldSchema()
.setName("timestamp_variant1")
.setType(StandardSQLTypeName.TIMESTAMP.toString());
private static final TableFieldSchema TIMESTAMP_VARIANT2 =
new TableFieldSchema()
.setName("timestamp_variant2")
.setType(StandardSQLTypeName.TIMESTAMP.toString());
private static final TableFieldSchema TIMESTAMP_VARIANT3 =
new TableFieldSchema()
.setName("timestamp_variant3")
.setType(StandardSQLTypeName.TIMESTAMP.toString());
private static final TableFieldSchema TIMESTAMP_VARIANT4 =
new TableFieldSchema()
.setName("timestamp_variant4")
.setType(StandardSQLTypeName.TIMESTAMP.toString());
private static final TableFieldSchema DATETIME =
new TableFieldSchema().setName("datetime").setType(StandardSQLTypeName.DATETIME.toString());
private static final TableFieldSchema DATETIME_0MS =
new TableFieldSchema()
.setName("datetime0ms")
.setType(StandardSQLTypeName.DATETIME.toString());
private static final TableFieldSchema DATETIME_0S_NS =
new TableFieldSchema()
.setName("datetime0s_ns")
.setType(StandardSQLTypeName.DATETIME.toString());
private static final TableFieldSchema DATETIME_0S_0NS =
new TableFieldSchema()
.setName("datetime0s_0ns")
.setType(StandardSQLTypeName.DATETIME.toString());
private static final TableFieldSchema DATE =
new TableFieldSchema().setName("date").setType(StandardSQLTypeName.DATE.toString());
private static final TableFieldSchema TIME =
new TableFieldSchema().setName("time").setType(StandardSQLTypeName.TIME.toString());
private static final TableFieldSchema TIME_0MS =
new TableFieldSchema().setName("time0ms").setType(StandardSQLTypeName.TIME.toString());
private static final TableFieldSchema TIME_0S_NS =
new TableFieldSchema().setName("time0s_ns").setType(StandardSQLTypeName.TIME.toString());
private static final TableFieldSchema TIME_0S_0NS =
new TableFieldSchema().setName("time0s_0ns").setType(StandardSQLTypeName.TIME.toString());
private static final TableFieldSchema VALID =
new TableFieldSchema().setName("valid").setType(StandardSQLTypeName.BOOL.toString());
private static final TableFieldSchema BINARY =
new TableFieldSchema().setName("binary").setType(StandardSQLTypeName.BYTES.toString());
private static final TableFieldSchema NUMERIC =
new TableFieldSchema().setName("numeric").setType(StandardSQLTypeName.NUMERIC.toString());
private static final TableFieldSchema COLOR =
new TableFieldSchema().setName("color").setType(StandardSQLTypeName.STRING.toString());
private static final TableFieldSchema IDS =
new TableFieldSchema()
.setName("ids")
.setType(StandardSQLTypeName.INT64.toString())
.setMode(Mode.REPEATED.toString());
private static final TableFieldSchema MAP_KEY =
new TableFieldSchema()
.setName("key")
.setType(StandardSQLTypeName.STRING.toString())
.setMode(Mode.REQUIRED.toString());
private static final TableFieldSchema MAP_VALUE =
new TableFieldSchema()
.setName("value")
.setType(StandardSQLTypeName.FLOAT64.toString())
.setMode(Mode.REQUIRED.toString());
private static final TableFieldSchema ROW =
new TableFieldSchema()
.setName("row")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.NULLABLE.toString())
.setFields(
Arrays.asList(
ID,
VALUE,
NAME,
TIMESTAMP_VARIANT1,
TIMESTAMP_VARIANT2,
TIMESTAMP_VARIANT3,
TIMESTAMP_VARIANT4,
DATETIME,
DATETIME_0MS,
DATETIME_0S_NS,
DATETIME_0S_0NS,
DATE,
TIME,
TIME_0MS,
TIME_0S_NS,
TIME_0S_0NS,
VALID,
BINARY,
NUMERIC));
private static final TableFieldSchema ROWS =
new TableFieldSchema()
.setName("rows")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.REPEATED.toString())
.setFields(
Arrays.asList(
ID,
VALUE,
NAME,
TIMESTAMP_VARIANT1,
TIMESTAMP_VARIANT2,
TIMESTAMP_VARIANT3,
TIMESTAMP_VARIANT4,
DATETIME,
DATETIME_0MS,
DATETIME_0S_NS,
DATETIME_0S_0NS,
DATE,
TIME,
TIME_0MS,
TIME_0S_NS,
TIME_0S_0NS,
VALID,
BINARY,
NUMERIC));
private static final TableFieldSchema MAP =
new TableFieldSchema()
.setName("map")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.REPEATED.toString())
.setFields(Arrays.asList(MAP_KEY, MAP_VALUE));
// Make sure that chosen BYTES test value is the same after a full base64 round trip.
private static final Row FLAT_ROW =
Row.withSchema(FLAT_TYPE)
.addValues(
123L,
123.456,
"test",
ISODateTimeFormat.dateHourMinuteSecondFraction()
.withZoneUTC()
.parseDateTime("2019-08-16T13:52:07.000"),
ISODateTimeFormat.dateHourMinuteSecondFraction()
.withZoneUTC()
.parseDateTime("2019-08-17T14:52:07.123"),
ISODateTimeFormat.dateHourMinuteSecondFraction()
.withZoneUTC()
.parseDateTime("2019-08-18T15:52:07.123"),
new DateTime(123456),
LocalDateTime.parse("2020-11-02T12:34:56.789876"),
LocalDateTime.parse("2020-11-02T12:34:56"),
LocalDateTime.parse("2020-11-02T12:34:00.789876"),
LocalDateTime.parse("2020-11-02T12:34"),
LocalDate.parse("2020-11-02"),
LocalTime.parse("12:34:56.789876"),
LocalTime.parse("12:34:56"),
LocalTime.parse("12:34:00.789876"),
LocalTime.parse("12:34"),
false,
Base64.getDecoder().decode("ABCD1234"),
new BigDecimal(123.456).setScale(3, RoundingMode.HALF_UP))
.build();
private static final TableRow BQ_FLAT_ROW =
new TableRow()
.set("id", "123")
.set("value", "123.456")
.set("name", "test")
.set("timestamp_variant1", "2019-08-16 13:52:07 UTC")
.set("timestamp_variant2", "2019-08-17 14:52:07.123 UTC")
// we'll loose precession, but it's something BigQuery can output!
.set("timestamp_variant3", "2019-08-18 15:52:07.123456 UTC")
.set(
"timestamp_variant4",
String.valueOf(
new DateTime(123456L, ISOChronology.getInstanceUTC()).getMillis() / 1000.0D))
.set("datetime", "2020-11-02T12:34:56.789876")
.set("datetime0ms", "2020-11-02T12:34:56")
.set("datetime0s_ns", "2020-11-02T12:34:00.789876")
.set("datetime0s_0ns", "2020-11-02T12:34:00")
.set("date", "2020-11-02")
.set("time", "12:34:56.789876")
.set("time0ms", "12:34:56")
.set("time0s_ns", "12:34:00.789876")
.set("time0s_0ns", "12:34:00")
.set("valid", "false")
.set("binary", "ABCD1234")
.set("numeric", "123.456");
private static final Row NULL_FLAT_ROW =
Row.withSchema(FLAT_TYPE)
.addValues(
null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null)
.build();
private static final TableRow BQ_NULL_FLAT_ROW =
new TableRow()
.set("id", null)
.set("value", null)
.set("name", null)
.set("timestamp_variant1", null)
.set("timestamp_variant2", null)
.set("timestamp_variant3", null)
.set("timestamp_variant4", null)
.set("datetime", null)
.set("datetime0ms", null)
.set("datetime0s_ns", null)
.set("datetime0s_0ns", null)
.set("date", null)
.set("time", null)
.set("time0ms", null)
.set("time0s_ns", null)
.set("time0s_0ns", null)
.set("valid", null)
.set("binary", null)
.set("numeric", null);
private static final Row ENUM_ROW =
Row.withSchema(ENUM_TYPE).addValues(new EnumerationType.Value(1)).build();
private static final Row ENUM_STRING_ROW =
Row.withSchema(ENUM_STRING_TYPE).addValues("GREEN").build();
private static final TableRow BQ_ENUM_ROW = new TableRow().set("color", "GREEN");
private static final Row ARRAY_ROW =
Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build();
private static final Row MAP_ROW =
Row.withSchema(MAP_MAP_TYPE).addValues(ImmutableMap.of("test", 123.456)).build();
private static final TableRow BQ_ARRAY_ROW =
new TableRow()
.set(
"ids",
Arrays.asList(
Collections.singletonMap("v", "123"), Collections.singletonMap("v", "124")));
private static final Row ROW_ROW = Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build();
private static final TableRow BQ_ROW_ROW = new TableRow().set("row", BQ_FLAT_ROW);
private static final Row ARRAY_ROW_ROW =
Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build();
private static final TableRow BQ_ARRAY_ROW_ROW =
new TableRow()
.set("rows", Collections.singletonList(Collections.singletonMap("v", BQ_FLAT_ROW)));
private static final TableSchema BQ_FLAT_TYPE =
new TableSchema()
.setFields(
Arrays.asList(
ID,
VALUE,
NAME,
TIMESTAMP_VARIANT1,
TIMESTAMP_VARIANT2,
TIMESTAMP_VARIANT3,
TIMESTAMP_VARIANT4,
DATETIME,
DATETIME_0MS,
DATETIME_0S_NS,
DATETIME_0S_0NS,
DATE,
TIME,
TIME_0MS,
TIME_0S_NS,
TIME_0S_0NS,
VALID,
BINARY,
NUMERIC));
private static final TableSchema BQ_ENUM_TYPE = new TableSchema().setFields(Arrays.asList(COLOR));
private static final TableSchema BQ_ARRAY_TYPE = new TableSchema().setFields(Arrays.asList(IDS));
private static final TableSchema BQ_ROW_TYPE = new TableSchema().setFields(Arrays.asList(ROW));
private static final TableSchema BQ_ARRAY_ROW_TYPE =
new TableSchema().setFields(Arrays.asList(ROWS));
private static final TableSchema BQ_MAP_TYPE = new TableSchema().setFields(Arrays.asList(MAP));
private static final Schema AVRO_FLAT_TYPE =
Schema.builder()
.addNullableField("id", Schema.FieldType.INT64)
.addNullableField("value", Schema.FieldType.DOUBLE)
.addNullableField("name", Schema.FieldType.STRING)
.addNullableField("valid", Schema.FieldType.BOOLEAN)
.build();
private static final Schema AVRO_ARRAY_TYPE =
Schema.builder().addArrayField("rows", Schema.FieldType.row(AVRO_FLAT_TYPE)).build();
private static final Schema AVRO_ARRAY_ARRAY_TYPE =
Schema.builder().addArrayField("array_rows", Schema.FieldType.row(AVRO_ARRAY_TYPE)).build();
@Test
public void testToTableSchema_flat() {
TableSchema schema = toTableSchema(FLAT_TYPE);
assertThat(
schema.getFields(),
containsInAnyOrder(
ID,
VALUE,
NAME,
TIMESTAMP_VARIANT1,
TIMESTAMP_VARIANT2,
TIMESTAMP_VARIANT3,
TIMESTAMP_VARIANT4,
DATETIME,
DATETIME_0MS,
DATETIME_0S_NS,
DATETIME_0S_0NS,
DATE,
TIME,
TIME_0MS,
TIME_0S_NS,
TIME_0S_0NS,
VALID,
BINARY,
NUMERIC));
}
@Test
public void testToTableSchema_enum() {
TableSchema schema = toTableSchema(ENUM_TYPE);
assertThat(schema.getFields(), containsInAnyOrder(COLOR));
}
@Test
public void testToTableSchema_array() {
TableSchema schema = toTableSchema(ARRAY_TYPE);
assertThat(schema.getFields(), contains(IDS));
}
@Test
public void testToTableSchema_row() {
TableSchema schema = toTableSchema(ROW_TYPE);
assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("row"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), nullValue());
assertThat(
field.getFields(),
containsInAnyOrder(
ID,
VALUE,
NAME,
TIMESTAMP_VARIANT1,
TIMESTAMP_VARIANT2,
TIMESTAMP_VARIANT3,
TIMESTAMP_VARIANT4,
DATETIME,
DATETIME_0MS,
DATETIME_0S_NS,
DATETIME_0S_0NS,
DATE,
TIME,
TIME_0MS,
TIME_0S_NS,
TIME_0S_0NS,
VALID,
BINARY,
NUMERIC));
}
@Test
public void testToTableSchema_array_row() {
TableSchema schema = toTableSchema(ARRAY_ROW_TYPE);
assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("rows"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
assertThat(
field.getFields(),
containsInAnyOrder(
ID,
VALUE,
NAME,
TIMESTAMP_VARIANT1,
TIMESTAMP_VARIANT2,
TIMESTAMP_VARIANT3,
TIMESTAMP_VARIANT4,
DATETIME,
DATETIME_0MS,
DATETIME_0S_NS,
DATETIME_0S_0NS,
DATE,
TIME,
TIME_0MS,
TIME_0S_NS,
TIME_0S_0NS,
VALID,
BINARY,
NUMERIC));
}
@Test
public void testToTableSchema_map() {
TableSchema schema = toTableSchema(MAP_MAP_TYPE);
assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("map"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE));
}
@Test
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);
assertThat(row.size(), equalTo(19));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
assertThat(row, hasEntry("datetime0s_0ns", "2020-11-02T12:34:00"));
assertThat(row, hasEntry("date", "2020-11-02"));
assertThat(row, hasEntry("time", "12:34:56.789876"));
assertThat(row, hasEntry("time0ms", "12:34:56"));
assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
}
@Test
public void testToTableRow_enum() {
TableRow row = toTableRow().apply(ENUM_ROW);
assertThat(row.size(), equalTo(1));
assertThat(row, hasEntry("color", "GREEN"));
}
@Test
public void testToTableRow_array() {
TableRow row = toTableRow().apply(ARRAY_ROW);
assertThat(row, hasEntry("ids", Arrays.asList("123", "124")));
assertThat(row.size(), equalTo(1));
}
@Test
public void testToTableRow_map() {
TableRow row = toTableRow().apply(MAP_ROW);
assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("map")).get(0);
assertThat(row.size(), equalTo(2));
assertThat(row, hasEntry("key", "test"));
assertThat(row, hasEntry("value", "123.456"));
}
@Test
public void testToTableRow_row() {
TableRow row = toTableRow().apply(ROW_ROW);
assertThat(row.size(), equalTo(1));
row = (TableRow) row.get("row");
assertThat(row.size(), equalTo(19));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
assertThat(row, hasEntry("datetime0s_0ns", "2020-11-02T12:34:00"));
assertThat(row, hasEntry("date", "2020-11-02"));
assertThat(row, hasEntry("time", "12:34:56.789876"));
assertThat(row, hasEntry("time0ms", "12:34:56"));
assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
}
@Test
public void testToTableRow_array_row() {
TableRow row = toTableRow().apply(ARRAY_ROW_ROW);
assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("rows")).get(0);
assertThat(row.size(), equalTo(19));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
assertThat(row, hasEntry("datetime0s_0ns", "2020-11-02T12:34:00"));
assertThat(row, hasEntry("date", "2020-11-02"));
assertThat(row, hasEntry("time", "12:34:56.789876"));
assertThat(row, hasEntry("time0ms", "12:34:56"));
assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
}
@Test
public void testToTableRow_null_row() {
TableRow row = toTableRow().apply(NULL_FLAT_ROW);
assertThat(row.size(), equalTo(19));
assertThat(row, hasEntry("id", null));
assertThat(row, hasEntry("value", null));
assertThat(row, hasEntry("name", null));
assertThat(row, hasEntry("timestamp_variant1", null));
assertThat(row, hasEntry("timestamp_variant2", null));
assertThat(row, hasEntry("timestamp_variant3", null));
assertThat(row, hasEntry("timestamp_variant4", null));
assertThat(row, hasEntry("datetime", null));
assertThat(row, hasEntry("datetime0ms", null));
assertThat(row, hasEntry("datetime0s_ns", null));
assertThat(row, hasEntry("datetime0s_0ns", null));
assertThat(row, hasEntry("date", null));
assertThat(row, hasEntry("time", null));
assertThat(row, hasEntry("time0ms", null));
assertThat(row, hasEntry("time0s_ns", null));
assertThat(row, hasEntry("time0s_0ns", null));
assertThat(row, hasEntry("valid", null));
assertThat(row, hasEntry("binary", null));
assertThat(row, hasEntry("numeric", null));
}
private static final BigQueryUtils.ConversionOptions TRUNCATE_OPTIONS =
BigQueryUtils.ConversionOptions.builder()
.setTruncateTimestamps(TruncateTimestamps.TRUNCATE)
.build();
private static final BigQueryUtils.ConversionOptions REJECT_OPTIONS =
BigQueryUtils.ConversionOptions.builder()
.setTruncateTimestamps(TruncateTimestamps.REJECT)
.build();
private static final BigQueryUtils.SchemaConversionOptions INFER_MAPS_OPTIONS =
BigQueryUtils.SchemaConversionOptions.builder().setInferMaps(true).build();
@Test
public void testSubMilliPrecisionRejected() {
assertThrows(
"precision",
IllegalArgumentException.class,
() -> BigQueryUtils.convertAvroFormat(FieldType.DATETIME, 1000000001L, REJECT_OPTIONS));
}
@Test
public void testMilliPrecisionOk() {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(FieldType.DATETIME, millis * 1000, REJECT_OPTIONS),
equalTo(new Instant(millis)));
}
@Test
public void testSubMilliPrecisionTruncated() {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(FieldType.DATETIME, millis * 1000 + 123, TRUNCATE_OPTIONS),
equalTo(new Instant(millis)));
}
@Test
public void testDateType() {
LocalDate d = LocalDate.parse("2020-06-04");
assertThat(
BigQueryUtils.convertAvroFormat(
FieldType.logicalType(SqlTypes.DATE), (int) d.toEpochDay(), REJECT_OPTIONS),
equalTo(d));
}
@Test
public void testMicroPrecisionTimeType() {
LocalTime t = LocalTime.parse("12:34:56.789876");
assertThat(
BigQueryUtils.convertAvroFormat(
FieldType.logicalType(SqlTypes.TIME), t.toNanoOfDay() / 1000, REJECT_OPTIONS),
equalTo(t));
}
@Test
public void testMicroPrecisionDateTimeType() {
LocalDateTime dt = LocalDateTime.parse("2020-06-04T12:34:56.789876");
assertThat(
BigQueryUtils.convertAvroFormat(
FieldType.logicalType(SqlTypes.DATETIME), new Utf8(dt.toString()), REJECT_OPTIONS),
equalTo(dt));
}
@Test
public void testNumericType() {
// BigQuery NUMERIC type has precision 38 and scale 9
BigDecimal n = new BigDecimal("123456789.987654321").setScale(9);
assertThat(
BigQueryUtils.convertAvroFormat(
FieldType.DECIMAL,
new Conversions.DecimalConversion().toBytes(n, null, LogicalTypes.decimal(38, 9)),
REJECT_OPTIONS),
equalTo(n));
}
@Test
public void testBytesType() {
byte[] bytes = "hello".getBytes(StandardCharsets.UTF_8);
assertThat(
BigQueryUtils.convertAvroFormat(FieldType.BYTES, ByteBuffer.wrap(bytes), REJECT_OPTIONS),
equalTo(bytes));
}
@Test
public void testFromTableSchema_flat() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_FLAT_TYPE);
assertEquals(FLAT_TYPE, beamSchema);
}
@Test
public void testFromTableSchema_enum() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ENUM_TYPE);
assertEquals(ENUM_STRING_TYPE, beamSchema);
}
@Test
public void testFromTableSchema_array() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_TYPE);
assertEquals(ARRAY_TYPE, beamSchema);
}
@Test
public void testFromTableSchema_row() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ROW_TYPE);
assertEquals(ROW_TYPE, beamSchema);
}
@Test
public void testFromTableSchema_array_row() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_ROW_TYPE);
assertEquals(ARRAY_ROW_TYPE, beamSchema);
}
@Test
public void testFromTableSchema_map_array() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_MAP_TYPE);
assertEquals(MAP_ARRAY_TYPE, beamSchema);
}
@Test
public void testFromTableSchema_map_map() {
Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_MAP_TYPE, INFER_MAPS_OPTIONS);
assertEquals(MAP_MAP_TYPE, beamSchema);
}
@Test
public void testToBeamRow_flat() {
Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_FLAT_ROW);
assertEquals(FLAT_ROW, beamRow);
}
@Test
public void testToBeamRow_null() {
Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_NULL_FLAT_ROW);
assertEquals(NULL_FLAT_ROW, beamRow);
}
@Test
public void testToBeamRow_enum() {
Row beamRow = BigQueryUtils.toBeamRow(ENUM_STRING_TYPE, BQ_ENUM_ROW);
assertEquals(ENUM_STRING_ROW, beamRow);
}
@Test
public void testToBeamRow_array() {
Row beamRow = BigQueryUtils.toBeamRow(ARRAY_TYPE, BQ_ARRAY_ROW);
assertEquals(ARRAY_ROW, beamRow);
}
@Test
public void testToBeamRow_row() {
Row beamRow = BigQueryUtils.toBeamRow(ROW_TYPE, BQ_ROW_ROW);
assertEquals(ROW_ROW, beamRow);
}
@Test
public void testToBeamRow_array_row() {
Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);
assertEquals(ARRAY_ROW_ROW, beamRow);
}
@Test
public void testToBeamRow_avro_array_row() {
Row flatRowExpected =
Row.withSchema(AVRO_FLAT_TYPE).addValues(123L, 123.456, "test", false).build();
Row expected =
Row.withSchema(AVRO_ARRAY_TYPE).addValues((Object) Arrays.asList(flatRowExpected)).build();
GenericData.Record record = new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_TYPE));
GenericData.Record flat = new GenericData.Record(AvroUtils.toAvroSchema(AVRO_FLAT_TYPE));
flat.put("id", 123L);
flat.put("value", 123.456);
flat.put("name", "test");
flat.put("valid", false);
record.put("rows", Arrays.asList(flat));
Row beamRow =
BigQueryUtils.toBeamRow(
record, AVRO_ARRAY_TYPE, BigQueryUtils.ConversionOptions.builder().build());
assertEquals(expected, beamRow);
}
@Test
public void testToBeamRow_avro_array_array_row() {
Row flatRowExpected =
Row.withSchema(AVRO_FLAT_TYPE).addValues(123L, 123.456, "test", false).build();
Row arrayRowExpected =
Row.withSchema(AVRO_ARRAY_TYPE).addValues((Object) Arrays.asList(flatRowExpected)).build();
Row expected =
Row.withSchema(AVRO_ARRAY_ARRAY_TYPE)
.addValues((Object) Arrays.asList(arrayRowExpected))
.build();
GenericData.Record arrayRecord =
new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_TYPE));
GenericData.Record flat = new GenericData.Record(AvroUtils.toAvroSchema(AVRO_FLAT_TYPE));
GenericData.Record record =
new GenericData.Record(AvroUtils.toAvroSchema(AVRO_ARRAY_ARRAY_TYPE));
flat.put("id", 123L);
flat.put("value", 123.456);
flat.put("name", "test");
flat.put("valid", false);
arrayRecord.put("rows", Arrays.asList(flat));
record.put("array_rows", Arrays.asList(arrayRecord));
Row beamRow =
BigQueryUtils.toBeamRow(
record, AVRO_ARRAY_ARRAY_TYPE, BigQueryUtils.ConversionOptions.builder().build());
assertEquals(expected, beamRow);
}
@Test
public void testToTableReference() {
{
TableReference tr =
BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable");
assertEquals("myproject", tr.getProjectId());
assertEquals("mydataset", tr.getDatasetId());
assertEquals("mytable", tr.getTableId());
}
{
// Test colon(":") after project format
TableReference tr = BigQueryUtils.toTableReference("myprojectwithcolon:mydataset.mytable");
assertEquals("myprojectwithcolon", tr.getProjectId());
assertEquals("mydataset", tr.getDatasetId());
assertEquals("mytable", tr.getTableId());
}
{
// Test dot(".") after project format
TableReference tr = BigQueryUtils.toTableReference("myprojectwithdot.mydataset.mytable");
assertEquals("myprojectwithdot", tr.getProjectId());
assertEquals("mydataset", tr.getDatasetId());
assertEquals("mytable", tr.getTableId());
}
// Invalid scenarios
assertNull(BigQueryUtils.toTableReference(""));
assertNull(BigQueryUtils.toTableReference(":."));
assertNull(BigQueryUtils.toTableReference(".."));
assertNull(BigQueryUtils.toTableReference("myproject"));
assertNull(BigQueryUtils.toTableReference("myproject:"));
assertNull(BigQueryUtils.toTableReference("myproject."));
assertNull(BigQueryUtils.toTableReference("myproject:mydataset"));
assertNull(BigQueryUtils.toTableReference("myproject:mydataset."));
assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable."));
assertNull(BigQueryUtils.toTableReference("myproject:mydataset:mytable:"));
assertNull(BigQueryUtils.toTableReference(".invalidleadingdot.mydataset.mytable"));
assertNull(BigQueryUtils.toTableReference("invalidtrailingdot.mydataset.mytable."));
assertNull(BigQueryUtils.toTableReference(":invalidleadingcolon.mydataset.mytable"));
assertNull(BigQueryUtils.toTableReference("invalidtrailingcolon.mydataset.mytable:"));
assertNull(BigQueryUtils.toTableReference("myproject.mydataset.mytable.myinvalidpart"));
assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable.myinvalidpart"));
assertNull(
BigQueryUtils.toTableReference("/projects/extraslash/datasets/mydataset/tables/mytable"));
assertNull(
BigQueryUtils.toTableReference("projects//extraslash/datasets/mydataset/tables/mytable"));
assertNull(
BigQueryUtils.toTableReference("projects/extraslash//datasets/mydataset/tables/mytable"));
assertNull(
BigQueryUtils.toTableReference("projects/extraslash/datasets//mydataset/tables/mytable"));
assertNull(
BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset//tables/mytable"));
assertNull(
BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables//mytable"));
assertNull(
BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables/mytable/"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables//"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets//tables/mytable/"));
assertNull(BigQueryUtils.toTableReference("projects//datasets/mydataset/tables/mytable/"));
assertNull(BigQueryUtils.toTableReference("projects//datasets//tables//"));
assertNull(
BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable/"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets"));
assertNull(BigQueryUtils.toTableReference("projects/myproject/"));
assertNull(BigQueryUtils.toTableReference("projects/myproject"));
assertNull(BigQueryUtils.toTableReference("projects/"));
assertNull(BigQueryUtils.toTableReference("projects"));
}
}