blob: 2682fc025ea18524969f1a8e9ffff243c39c5321 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.utils;
import static org.junit.Assert.assertEquals;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Test;
/** Tests for {@link SelectHelpers}. */
public class SelectHelpersTest {
static final Schema FLAT_SCHEMA =
Schema.builder()
.addStringField("field1")
.addInt32Field("field2")
.addDoubleField("field3")
.addStringField("field_extra")
.build();
static final Row FLAT_ROW =
Row.withSchema(FLAT_SCHEMA).addValues("first", 42, 3.14, "extra").build();
static final Schema NESTED_SCHEMA =
Schema.builder().addRowField("nested", FLAT_SCHEMA).addStringField("foo").build();
static final Row NESTED_ROW = Row.withSchema(NESTED_SCHEMA).addValues(FLAT_ROW, "").build();
static final Schema DOUBLE_NESTED_SCHEMA =
Schema.builder().addRowField("nested2", NESTED_SCHEMA).build();
static final Row DOUBLE_NESTED_ROW =
Row.withSchema(DOUBLE_NESTED_SCHEMA).addValue(NESTED_ROW).build();
static final Schema ARRAY_SCHEMA =
Schema.builder()
.addArrayField("primitiveArray", FieldType.INT32)
.addArrayField("rowArray", FieldType.row(FLAT_SCHEMA))
.addArrayField("arrayOfRowArray", FieldType.array(FieldType.row(FLAT_SCHEMA)))
.addArrayField("nestedRowArray", FieldType.row(NESTED_SCHEMA))
.build();
static final Row ARRAY_ROW =
Row.withSchema(ARRAY_SCHEMA)
.addArray(1, 2)
.addArray(FLAT_ROW, FLAT_ROW)
.addArray(ImmutableList.of(FLAT_ROW), ImmutableList.of(FLAT_ROW))
.addArray(NESTED_ROW, NESTED_ROW)
.build();
static final Schema MAP_SCHEMA =
Schema.builder().addMapField("map", FieldType.INT32, FieldType.row(FLAT_SCHEMA)).build();
static final Row MAP_ROW =
Row.withSchema(MAP_SCHEMA).addValue(ImmutableMap.of(1, FLAT_ROW)).build();
static final Schema MAP_ARRAY_SCHEMA =
Schema.builder()
.addMapField("map", FieldType.INT32, FieldType.array(FieldType.row(FLAT_SCHEMA)))
.build();
static final Row MAP_ARRAY_ROW =
Row.withSchema(MAP_ARRAY_SCHEMA)
.addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW)))
.build();
@Test
public void testSelectAll() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("*").resolve(FLAT_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor);
assertEquals(FLAT_SCHEMA, outputSchema);
Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema);
assertEquals(FLAT_ROW, row);
}
@Test
public void testsSimpleSelectSingle() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("field1").resolve(FLAT_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema = Schema.builder().addStringField("field1").build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
assertEquals(expectedRow, row);
}
@Test
public void testsSimpleSelectSingleWithUnderscore() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("field_extra").resolve(FLAT_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema = Schema.builder().addStringField("field_extra").build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValue("extra").build();
assertEquals(expectedRow, row);
}
@Test
public void testsSimpleSelectMultiple() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("field1", "field3").resolve(FLAT_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder().addStringField("field1").addDoubleField("field3").build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValues("first", 3.14).build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectedNested() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("nested").resolve(NESTED_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema = Schema.builder().addRowField("nested", FLAT_SCHEMA).build();
assertEquals(expectedSchema, outputSchema);
Row row =
SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValue(FLAT_ROW).build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectedNestedSingle() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("nested.field1").resolve(NESTED_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema = Schema.builder().addStringField("field1").build();
assertEquals(expectedSchema, outputSchema);
Row row =
SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectedNestedWildcard() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("nested.*").resolve(NESTED_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor);
assertEquals(FLAT_SCHEMA, outputSchema);
Row row =
SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema);
assertEquals(FLAT_ROW, row);
}
@Test
public void testSelectDoubleNested() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("nested2.nested.field1").resolve(DOUBLE_NESTED_SCHEMA);
Schema outputSchema =
SelectHelpers.getOutputSchema(DOUBLE_NESTED_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema = Schema.builder().addStringField("field1").build();
assertEquals(expectedSchema, outputSchema);
Row row =
SelectHelpers.selectRow(
DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectArrayOfPrimitive() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("primitiveArray").resolve(ARRAY_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder().addArrayField("primitiveArray", FieldType.INT32).build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectArrayOfRow() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("rowArray").resolve(ARRAY_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder().addArrayField("rowArray", FieldType.row(FLAT_SCHEMA)).build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW, FLAT_ROW).build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectArrayOfRowPartial() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("rowArray[].field1").resolve(ARRAY_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema = Schema.builder().addArrayField("field1", FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addArray("first", "first").build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectArrayOfRowArray() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("arrayOfRowArray[][].field1").resolve(ARRAY_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder().addArrayField("field1", FieldType.array(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema);
Row expectedRow =
Row.withSchema(expectedSchema)
.addArray(ImmutableList.of("first"), ImmutableList.of("first"))
.build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectArrayOfNestedRow() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("nestedRowArray[].nested.field1")
.resolve(ARRAY_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor);
Schema expectedElementSchema = Schema.builder().addStringField("field1").build();
Schema expectedSchema = Schema.builder().addArrayField("field1", FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addArray("first", "first").build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectMapOfRowSelectSingle() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("map{}.field1").resolve(MAP_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor);
Schema expectedValueSchema = Schema.builder().addStringField("field1").build();
Schema expectedSchema =
Schema.builder().addMapField("field1", FieldType.INT32, FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema);
Row expectedRow = Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectMapOfRowSelectAll() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder()
.addMapField("field1", FieldType.INT32, FieldType.STRING)
.addMapField("field2", FieldType.INT32, FieldType.INT32)
.addMapField("field3", FieldType.INT32, FieldType.DOUBLE)
.addMapField("field_extra", FieldType.INT32, FieldType.STRING)
.build();
assertEquals(expectedSchema, outputSchema);
Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema);
Row expectedRow =
Row.withSchema(expectedSchema)
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1)))
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2)))
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(3)))
.build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectMapOfArray() {
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("map.field1").resolve(MAP_ARRAY_SCHEMA);
Schema outputSchema = SelectHelpers.getOutputSchema(MAP_ARRAY_SCHEMA, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder()
.addMapField("field1", FieldType.INT32, FieldType.array(FieldType.STRING))
.build();
assertEquals(expectedSchema, outputSchema);
Row row =
SelectHelpers.selectRow(
MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA, outputSchema);
Row expectedRow =
Row.withSchema(expectedSchema)
.addValue(ImmutableMap.of(1, ImmutableList.of("first")))
.build();
assertEquals(expectedRow, row);
}
@Test
public void testSelectFieldOfRecord() {
Schema f1 = Schema.builder().addInt64Field("f0").build();
Schema f2 = Schema.builder().addRowField("f1", f1).build();
Schema f3 = Schema.builder().addRowField("f2", f2).build();
Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}}
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("f2.f1").resolve(f3);
Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor);
Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema);
assertEquals(f2, outputSchema);
assertEquals(r2, out);
}
@Test
public void testSelectFieldOfRecordOrRecord() {
Schema f1 = Schema.builder().addInt64Field("f0").build();
Schema f2 = Schema.builder().addRowField("f1", f1).build();
Schema f3 = Schema.builder().addRowField("f2", f2).build();
Schema f4 = Schema.builder().addRowField("f3", f3).build();
Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}}
Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1": {"f0": 42}}}}
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4);
Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor);
Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema);
assertEquals(f3, outputSchema);
assertEquals(r3, out);
}
@Test
public void testArrayRowArray() {
Schema f1 = Schema.builder().addStringField("f0").build();
Schema f2 = Schema.builder().addArrayField("f1", FieldType.row(f1)).build();
Schema f3 = Schema.builder().addRowField("f2", f2).build();
Schema f4 = Schema.builder().addArrayField("f3", FieldType.row(f3)).build();
Row r1 = Row.withSchema(f1).addValue("first").build();
Row r2 = Row.withSchema(f2).addArray(r1, r1).build();
Row r3 = Row.withSchema(f3).addValue(r2).build();
Row r4 = Row.withSchema(f4).addArray(r3, r3).build();
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4);
Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor);
Schema expectedSchema =
Schema.builder().addArrayField("f0", FieldType.array(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema);
Row expected =
Row.withSchema(outputSchema)
.addArray(Lists.newArrayList("first", "first"), Lists.newArrayList("first", "first"))
.build();
assertEquals(expected, out);
}
}