blob: 5f8f2272116040febe673bf0fb55514d117ee9a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
public class SparkDataFile implements DataFile {
private final int filePathPosition;
private final int fileFormatPosition;
private final int partitionPosition;
private final int recordCountPosition;
private final int fileSizeInBytesPosition;
private final int columnSizesPosition;
private final int valueCountsPosition;
private final int nullValueCountsPosition;
private final int nanValueCountsPosition;
private final int lowerBoundsPosition;
private final int upperBoundsPosition;
private final int keyMetadataPosition;
private final int splitOffsetsPosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
private Row wrapped;
public SparkDataFile(Types.StructType type, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
Map<String, Integer> positions = Maps.newHashMap();
type.fields().forEach(field -> {
String fieldName = field.name();
positions.put(fieldName, fieldPosition(fieldName, sparkType));
});
filePathPosition = positions.get("file_path");
fileFormatPosition = positions.get("file_format");
partitionPosition = positions.get("partition");
recordCountPosition = positions.get("record_count");
fileSizeInBytesPosition = positions.get("file_size_in_bytes");
columnSizesPosition = positions.get("column_sizes");
valueCountsPosition = positions.get("value_counts");
nullValueCountsPosition = positions.get("null_value_counts");
nanValueCountsPosition = positions.get("nan_value_counts");
lowerBoundsPosition = positions.get("lower_bounds");
upperBoundsPosition = positions.get("upper_bounds");
keyMetadataPosition = positions.get("key_metadata");
splitOffsetsPosition = positions.get("split_offsets");
}
public SparkDataFile wrap(Row row) {
this.wrapped = row;
if (wrappedPartition.size() > 0) {
this.wrappedPartition.wrap(row.getAs(partitionPosition));
}
return this;
}
@Override
public Long pos() {
return null;
}
@Override
public int specId() {
return -1;
}
@Override
public CharSequence path() {
return wrapped.getAs(filePathPosition);
}
@Override
public FileFormat format() {
String formatAsString = wrapped.getString(fileFormatPosition).toUpperCase(Locale.ROOT);
return FileFormat.valueOf(formatAsString);
}
@Override
public StructLike partition() {
return wrappedPartition;
}
@Override
public long recordCount() {
return wrapped.getAs(recordCountPosition);
}
@Override
public long fileSizeInBytes() {
return wrapped.getAs(fileSizeInBytesPosition);
}
@Override
public Map<Integer, Long> columnSizes() {
return wrapped.isNullAt(columnSizesPosition) ? null : wrapped.getJavaMap(columnSizesPosition);
}
@Override
public Map<Integer, Long> valueCounts() {
return wrapped.isNullAt(valueCountsPosition) ? null : wrapped.getJavaMap(valueCountsPosition);
}
@Override
public Map<Integer, Long> nullValueCounts() {
return wrapped.isNullAt(nullValueCountsPosition) ? null : wrapped.getJavaMap(nullValueCountsPosition);
}
@Override
public Map<Integer, Long> nanValueCounts() {
return wrapped.isNullAt(nanValueCountsPosition) ? null : wrapped.getJavaMap(nanValueCountsPosition);
}
@Override
public Map<Integer, ByteBuffer> lowerBounds() {
Map<?, ?> lowerBounds = wrapped.isNullAt(lowerBoundsPosition) ? null : wrapped.getJavaMap(lowerBoundsPosition);
return convert(lowerBoundsType, lowerBounds);
}
@Override
public Map<Integer, ByteBuffer> upperBounds() {
Map<?, ?> upperBounds = wrapped.isNullAt(upperBoundsPosition) ? null : wrapped.getJavaMap(upperBoundsPosition);
return convert(upperBoundsType, upperBounds);
}
@Override
public ByteBuffer keyMetadata() {
return convert(keyMetadataType, wrapped.get(keyMetadataPosition));
}
@Override
public DataFile copy() {
throw new UnsupportedOperationException("Not implemented: copy");
}
@Override
public DataFile copyWithoutStats() {
throw new UnsupportedOperationException("Not implemented: copyWithoutStats");
}
@Override
public List<Long> splitOffsets() {
return wrapped.isNullAt(splitOffsetsPosition) ? null : wrapped.getList(splitOffsetsPosition);
}
private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
} catch (IllegalArgumentException e) {
// the partition field is absent for unpartitioned tables
if (name.equals("partition") && wrappedPartition.size() == 0) {
return -1;
}
throw e;
}
}
@SuppressWarnings("unchecked")
private <T> T convert(Type valueType, Object value) {
return (T) SparkValueConverter.convert(valueType, value);
}
}