| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg.spark.source; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.stream.Stream; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.util.Utf8; |
| import org.apache.iceberg.CombinedScanTask; |
| import org.apache.iceberg.FileScanTask; |
| import org.apache.iceberg.encryption.EncryptedFiles; |
| import org.apache.iceberg.encryption.EncryptedInputFile; |
| import org.apache.iceberg.encryption.EncryptionManager; |
| import org.apache.iceberg.io.CloseableIterator; |
| import org.apache.iceberg.io.FileIO; |
| import org.apache.iceberg.io.InputFile; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
| import org.apache.iceberg.types.Type; |
| import org.apache.iceberg.util.ByteBuffers; |
| import org.apache.spark.rdd.InputFileBlockHolder; |
| import org.apache.spark.sql.types.Decimal; |
| import org.apache.spark.unsafe.types.UTF8String; |
| |
| /** |
| * Base class of Spark readers. |
| * |
| * @param <T> is the Java class returned by this reader whose objects contain one or more rows. |
| */ |
| abstract class BaseDataReader<T> implements Closeable { |
| private final Iterator<FileScanTask> tasks; |
| private final Map<String, InputFile> inputFiles; |
| |
| private CloseableIterator<T> currentIterator; |
| private T current = null; |
| |
| BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) { |
| this.tasks = task.files().iterator(); |
| Stream<EncryptedInputFile> encrypted = task.files().stream() |
| .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) |
| .map(file -> EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), file.keyMetadata())); |
| |
| // decrypt with the batch call to avoid multiple RPCs to a key server, if possible |
| Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator); |
| |
| ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder(); |
| decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted)); |
| this.inputFiles = inputFileBuilder.build(); |
| this.currentIterator = CloseableIterator.empty(); |
| } |
| |
| public boolean next() throws IOException { |
| while (true) { |
| if (currentIterator.hasNext()) { |
| this.current = currentIterator.next(); |
| return true; |
| } else if (tasks.hasNext()) { |
| this.currentIterator.close(); |
| this.currentIterator = open(tasks.next()); |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| public T get() { |
| return current; |
| } |
| |
| abstract CloseableIterator<T> open(FileScanTask task); |
| |
| @Override |
| public void close() throws IOException { |
| InputFileBlockHolder.unset(); |
| |
| // close the current iterator |
| this.currentIterator.close(); |
| |
| // exhaust the task iterator |
| while (tasks.hasNext()) { |
| tasks.next(); |
| } |
| } |
| |
| protected InputFile getInputFile(FileScanTask task) { |
| Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); |
| return inputFiles.get(task.file().path().toString()); |
| } |
| |
| protected InputFile getInputFile(String location) { |
| return inputFiles.get(location); |
| } |
| |
| protected static Object convertConstant(Type type, Object value) { |
| if (value == null) { |
| return null; |
| } |
| |
| switch (type.typeId()) { |
| case DECIMAL: |
| return Decimal.apply((BigDecimal) value); |
| case STRING: |
| if (value instanceof Utf8) { |
| Utf8 utf8 = (Utf8) value; |
| return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength()); |
| } |
| return UTF8String.fromString(value.toString()); |
| case FIXED: |
| if (value instanceof byte[]) { |
| return value; |
| } else if (value instanceof GenericData.Fixed) { |
| return ((GenericData.Fixed) value).bytes(); |
| } |
| return ByteBuffers.toByteArray((ByteBuffer) value); |
| case BINARY: |
| return ByteBuffers.toByteArray((ByteBuffer) value); |
| default: |
| } |
| return value; |
| } |
| } |