blob: a23a3ec168ce1fb1c5b5f294c50978c33abb80d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableSchema;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link BigQueryUtils}. */
@RunWith(JUnit4.class)
public class BigQueryUtilsTest {
private static final Schema FLAT_TYPE =
Schema.builder()
.addNullableField("id", Schema.FieldType.INT64)
.addNullableField("value", Schema.FieldType.DOUBLE)
.addNullableField("name", Schema.FieldType.STRING)
.addNullableField("timestamp", Schema.FieldType.DATETIME)
.addNullableField("valid", Schema.FieldType.BOOLEAN)
.build();
private static final Schema ARRAY_TYPE =
Schema.builder().addArrayField("ids", Schema.FieldType.INT64).build();
private static final Schema ROW_TYPE =
Schema.builder().addNullableField("row", Schema.FieldType.row(FLAT_TYPE)).build();
private static final Schema ARRAY_ROW_TYPE =
Schema.builder().addArrayField("rows", Schema.FieldType.row(FLAT_TYPE)).build();
private static final TableFieldSchema ID =
new TableFieldSchema().setName("id").setType(StandardSQLTypeName.INT64.toString());
private static final TableFieldSchema VALUE =
new TableFieldSchema().setName("value").setType(StandardSQLTypeName.FLOAT64.toString());
private static final TableFieldSchema NAME =
new TableFieldSchema().setName("name").setType(StandardSQLTypeName.STRING.toString());
private static final TableFieldSchema TIMESTAMP =
new TableFieldSchema().setName("timestamp").setType(StandardSQLTypeName.TIMESTAMP.toString());
private static final TableFieldSchema VALID =
new TableFieldSchema().setName("valid").setType(StandardSQLTypeName.BOOL.toString());
private static final TableFieldSchema IDS =
new TableFieldSchema()
.setName("ids")
.setType(StandardSQLTypeName.INT64.toString())
.setMode(Mode.REPEATED.toString());
private static final Row FLAT_ROW =
Row.withSchema(FLAT_TYPE)
.addValues(123L, 123.456, "test", new DateTime(123456), false)
.build();
private static final Row NULL_FLAT_ROW =
Row.withSchema(FLAT_TYPE).addValues(null, null, null, null, null).build();
private static final Row ARRAY_ROW =
Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build();
private static final Row ROW_ROW = Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build();
private static final Row ARRAY_ROW_ROW =
Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build();
@Test
public void testToTableSchema_flat() {
TableSchema schema = toTableSchema(FLAT_TYPE);
assertThat(schema.getFields(), containsInAnyOrder(ID, VALUE, NAME, TIMESTAMP, VALID));
}
@Test
public void testToTableSchema_array() {
TableSchema schema = toTableSchema(ARRAY_TYPE);
assertThat(schema.getFields(), contains(IDS));
}
@Test
public void testToTableSchema_row() {
TableSchema schema = toTableSchema(ROW_TYPE);
assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("row"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), nullValue());
assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, TIMESTAMP, VALID));
}
@Test
public void testToTableSchema_array_row() {
TableSchema schema = toTableSchema(ARRAY_ROW_TYPE);
assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("rows"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, TIMESTAMP, VALID));
}
@Test
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);
assertThat(row.size(), equalTo(5));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
}
@Test
public void testToTableRow_array() {
TableRow row = toTableRow().apply(ARRAY_ROW);
assertThat(row, hasEntry("ids", Arrays.asList("123", "124")));
assertThat(row.size(), equalTo(1));
}
@Test
public void testToTableRow_row() {
TableRow row = toTableRow().apply(ROW_ROW);
assertThat(row.size(), equalTo(1));
row = (TableRow) row.get("row");
assertThat(row.size(), equalTo(5));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
}
@Test
public void testToTableRow_array_row() {
TableRow row = toTableRow().apply(ARRAY_ROW_ROW);
assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("rows")).get(0);
assertThat(row.size(), equalTo(5));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
}
@Test
public void testToTableRow_null_row() {
TableRow row = toTableRow().apply(NULL_FLAT_ROW);
assertThat(row.size(), equalTo(5));
assertThat(row, hasEntry("id", null));
assertThat(row, hasEntry("value", null));
assertThat(row, hasEntry("name", null));
assertThat(row, hasEntry("timestamp", null));
assertThat(row, hasEntry("valid", null));
}
private static final BigQueryUtils.ConversionOptions TRUNCATE_OPTIONS =
BigQueryUtils.ConversionOptions.builder()
.setTruncateTimestamps(TruncateTimestamps.TRUNCATE)
.build();
private static final BigQueryUtils.ConversionOptions REJECT_OPTIONS =
BigQueryUtils.ConversionOptions.builder()
.setTruncateTimestamps(TruncateTimestamps.REJECT)
.build();
@Test
public void testSubMilliPrecisionRejected() {
assertThrows(
"precision",
IllegalArgumentException.class,
() ->
BigQueryUtils.convertAvroFormat(
Schema.Field.of("dummy", Schema.FieldType.DATETIME), 1000000001L, REJECT_OPTIONS));
}
@Test
public void testMilliPrecisionOk() {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
Schema.Field.of("dummy", Schema.FieldType.DATETIME), millis * 1000, REJECT_OPTIONS),
equalTo(new Instant(millis)));
}
@Test
public void testSubMilliPrecisionTruncated() {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
Schema.Field.of("dummy", Schema.FieldType.DATETIME),
millis * 1000 + 123,
TRUNCATE_OPTIONS),
equalTo(new Instant(millis)));
}
@Test
public void testSubMilliPrecisionLogicalTypeRejected() {
assertThrows(
"precision",
IllegalArgumentException.class,
() ->
BigQueryUtils.convertAvroFormat(
Schema.Field.of("dummy", Schema.FieldType.logicalType(new FakeSqlTimeType())),
1000000001L,
REJECT_OPTIONS));
}
@Test
public void testMilliPrecisionOkLogicaltype() {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
Schema.Field.of("dummy", Schema.FieldType.logicalType(new FakeSqlTimeType())),
millis * 1000,
REJECT_OPTIONS),
equalTo(new Instant(millis)));
}
@Test
public void testMilliPrecisionTruncatedLogicaltype() {
long millis = 123456789L;
assertThat(
BigQueryUtils.convertAvroFormat(
Schema.Field.of("dummy", Schema.FieldType.logicalType(new FakeSqlTimeType())),
millis * 1000 + 123,
TRUNCATE_OPTIONS),
equalTo(new Instant(millis)));
}
private static class FakeSqlTimeType implements Schema.LogicalType<Long, Instant> {
@Override
public String getIdentifier() {
return "SqlTimeType";
}
@Override
public Schema.FieldType getBaseType() {
return Schema.FieldType.DATETIME;
}
@Override
public Instant toBaseType(Long input) {
// Already converted to millis outside this constructor
return new Instant((long) input);
}
@Override
public Long toInputType(Instant base) {
return base.getMillis();
}
}
}