blob: fc3d548086b8266019991294d8f9f0e0458bf0c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.hive.readers;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.hive.HivePartition;
import org.apache.drill.exec.store.hive.HiveSubScan;
import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Factory for creation of Hive record readers used by {@link org.apache.drill.exec.store.hive.HiveScanBatchCreator}.
*/
public class ReadersInitializer {
private static final String TEXT_FORMAT = TextInputFormat.class.getCanonicalName();
/**
* Selects reader constructor reference as {@link HiveReaderFactory} readerFactory.
* Then check if input splits are empty creates empty record reader, or one reader per split otherwise.
*
* @param ctx context related to fragment
* @param config context which holds different Hive configurations
* @return list containing one or more readers
*/
public static List<RecordReader> init(ExecutorFragmentContext ctx, HiveSubScan config) {
final HiveReaderFactory readerFactory = getReaderFactory(config);
final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), ctx.getQueryUserName());
final List<List<InputSplit>> inputSplits = config.getInputSplits();
final HiveConf hiveConf = config.getHiveConf();
if (inputSplits.isEmpty()) {
return Collections.singletonList(
readerFactory.createReader(config.getTable(), null /*partition*/, null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi)
);
} else {
IndexedPartitions partitions = getPartitions(config);
return IntStream.range(0, inputSplits.size())
.mapToObj(idx ->
readerFactory.createReader(
config.getTable(),
partitions.get(idx),
inputSplits.get(idx),
config.getColumns(),
ctx, hiveConf, proxyUgi))
.collect(Collectors.toList());
}
}
/**
* Returns reference to {@link HiveTextRecordReader}'s constructor if
* file being read has text format or reference to {@link HiveDefaultRecordReader}'s
* constructor otherwise.
*
* @param config context which holds different Hive configurations
* @return reference to concrete reader constructor which is unified under type {@link HiveReaderFactory}
*/
private static HiveReaderFactory getReaderFactory(HiveSubScan config) {
String inputFormat = config.getTable().getSd().getInputFormat();
return TEXT_FORMAT.equals(inputFormat) ? HiveTextRecordReader::new : HiveDefaultRecordReader::new;
}
/**
* Used to select logic for partitions retrieval by index.
* If table hasn't partitions, get by index just returns null.
*
* @param config context which holds different Hive configurations
* @return get by index lambda expression unified under type {@link IndexedPartitions}
*/
private static IndexedPartitions getPartitions(HiveSubScan config) {
return (config.getPartitions() == null || config.getPartitions().isEmpty()) ? (idx) -> null : config.getPartitions()::get;
}
/**
* Functional interface used to describe parameters accepted by concrete
* readers constructor.
*/
@FunctionalInterface
private interface HiveReaderFactory {
RecordReader createReader(HiveTableWithColumnCache table, HivePartition partition,
Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi);
}
/**
* Functional interface used to represent get partition
* by index logic.
*/
@FunctionalInterface
private interface IndexedPartitions {
HivePartition get(int idx);
}
}