blob: d1c4d125d25d9bf9a17f50028e5beaf63291e45a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
/**
* A transform to drop fields from a schema.
*
* <p>This transform acts as the inverse of the {@link Select} transform. A list of fields to drop
* is specified, and all fields in the schema that are not specified are selected. For example:
*
* <pre>{@code @DefaultSchema(JavaFieldSchema.class)
* public class UserEvent {
* public String userId;
* public String eventId;
* public int eventType;
* public Location location;
* }}</pre>
*
* <pre>{@code @DefaultSchema(JavaFieldSchema.class)
* public class Location {
* public double latitude;
* public double longtitude;
* }
*
* PCollection<UserEvent> events = readUserEvents();
* // Drop the location field.
* PCollection<Row> noLocation = events.apply(DropFields.fields("location"));
* // Drop the latitude field.
* PCollection<Row> noLatitude = events.apply(DropFields.fields("location.latitude"));
* }</pre>
*/
@Experimental(Kind.SCHEMAS)
public class DropFields {
public static <T> Inner<T> fields(String... fields) {
return fields(FieldAccessDescriptor.withFieldNames(fields));
}
public static <T> Inner<T> fields(Integer... fieldIds) {
return fields(FieldAccessDescriptor.withFieldIds(fieldIds));
}
public static <T> Inner<T> fields(FieldAccessDescriptor fieldsToDrop) {
return new Inner<>(fieldsToDrop);
}
/** Implementation class for DropFields. */
public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
private final FieldAccessDescriptor fieldsToDrop;
private Inner(FieldAccessDescriptor fieldsToDrop) {
this.fieldsToDrop = fieldsToDrop;
}
FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input) {
// Create a FieldAccessDescriptor that select all fields _not_ selected in the input
// descriptor. Maintain
// the original order of the schema.
Set<String> fieldNamesToSelect = Sets.newHashSet();
Map<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> nestedFieldsToSelect =
Maps.newHashMap();
for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
if (input.fieldIdsAccessed().contains(i)) {
// This field is selected, so exclude it from the complement.
continue;
}
Field field = inputSchema.getField(i);
Map<Integer, FieldAccessDescriptor.FieldDescriptor> nestedFields =
input.getNestedFieldsAccessed().keySet().stream()
.collect(Collectors.toMap(k -> k.getFieldId(), k -> k));
FieldAccessDescriptor.FieldDescriptor fieldDescriptor = nestedFields.get(i);
if (fieldDescriptor != null) {
// Some subfields are selected, so recursively calculate the complementary subfields to
// select.
FieldType fieldType = inputSchema.getField(i).getType();
for (FieldAccessDescriptor.FieldDescriptor.Qualifier qualifier :
fieldDescriptor.getQualifiers()) {
switch (qualifier.getKind()) {
case LIST:
fieldType = fieldType.getCollectionElementType();
break;
case MAP:
fieldType = fieldType.getMapValueType();
break;
default:
throw new RuntimeException("Unexpected field descriptor type.");
}
}
checkArgument(fieldType.getTypeName().isCompositeType());
FieldAccessDescriptor nestedDescriptor =
input.getNestedFieldsAccessed().get(fieldDescriptor);
nestedFieldsToSelect.put(
fieldDescriptor, complement(fieldType.getRowSchema(), nestedDescriptor));
} else {
// Neither the field nor the subfield is selected. This means we should select it.
fieldNamesToSelect.add(field.getName());
}
}
FieldAccessDescriptor fieldAccess = FieldAccessDescriptor.withFieldNames(fieldNamesToSelect);
for (Map.Entry<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> entry :
nestedFieldsToSelect.entrySet()) {
fieldAccess = fieldAccess.withNestedField(entry.getKey(), entry.getValue());
}
return fieldAccess.resolve(inputSchema);
}
@Override
public PCollection<Row> expand(PCollection<T> input) {
Schema inputSchema = input.getSchema();
FieldAccessDescriptor selectDescriptor =
complement(inputSchema, fieldsToDrop.resolve(inputSchema));
return input.apply(Select.fieldAccess(selectDescriptor));
}
}
}