blob: e5a249b43c5f7b038c84993dfa11dc73cd2b61e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.table.filesystem.PartitionFetcher;
import org.apache.flink.table.filesystem.PartitionReader;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
/**
* Hive Table Source that has lookup ability.
*
* <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read
* source it does not have the lookup ability but can be a temporal table just like other stream
* sources. When it acts as bounded table, it has the lookup ability.
*
* <p>A common user case is use hive table as dimension table and always lookup the latest partition
* data, in this case, hive table source is a continuous read source but currently we implements it
* by LookupFunction. Because currently TableSource can not tell the downstream when the latest
* partition has been read finished. This is a temporarily workaround and will re-implement in the
* future.
*/
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
private static final Logger LOG = LoggerFactory.getLogger(HiveLookupTableSource.class);
private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
private final Configuration configuration;
private Duration hiveTableReloadInterval;
public HiveLookupTableSource(
JobConf jobConf,
ReadableConfig flinkConf,
ObjectPath tablePath,
CatalogTable catalogTable) {
super(jobConf, flinkConf, tablePath, catalogTable);
this.configuration = new Configuration();
catalogTable.getOptions().forEach(configuration::setString);
validateLookupConfigurations();
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
}
@Override
public DynamicTableSource copy() {
HiveLookupTableSource source =
new HiveLookupTableSource(jobConf, flinkConf, tablePath, catalogTable);
source.remainingPartitions = remainingPartitions;
source.projectedFields = projectedFields;
source.limit = limit;
return source;
}
@VisibleForTesting
TableFunction<RowData> getLookupFunction(int[][] keys) {
int[] keyIndices = new int[keys.length];
int i = 0;
for (int[] key : keys) {
if (key.length > 1) {
throw new UnsupportedOperationException(
"Hive lookup can not support nested key now.");
}
keyIndices[i] = key[0];
i++;
}
return getLookupFunction(keyIndices);
}
private void validateLookupConfigurations() {
String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
if (isStreamingSource()) {
Preconditions.checkArgument(
!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
String.format(
"The '%s' is not supported when set '%s' to 'latest'",
STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
STREAMING_SOURCE_PARTITION_INCLUDE.key()));
Duration monitorInterval =
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
? DEFAULT_LOOKUP_MONITOR_INTERVAL
: configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
if (monitorInterval.toMillis() < DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis()) {
LOG.warn(
String.format(
"Currently the recommended value of '%s' is at least '%s' when set '%s' to 'latest',"
+ " but actual is '%s', this may produce big pressure to hive metastore.",
STREAMING_SOURCE_MONITOR_INTERVAL.key(),
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
monitorInterval.toMillis()));
}
hiveTableReloadInterval = monitorInterval;
} else {
Preconditions.checkArgument(
"all".equals(partitionInclude),
String.format(
"The only supported %s for lookup is '%s' in batch source,"
+ " but actual is '%s'",
STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
hiveTableReloadInterval = configuration.get(LOOKUP_JOIN_CACHE_TTL);
}
}
private TableFunction<RowData> getLookupFunction(int[] keys) {
final String defaultPartitionName = JobConfUtils.getDefaultPartitionName(jobConf);
PartitionFetcher.Context<HiveTablePartition> fetcherContext =
new HiveTablePartitionFetcherContext(
tablePath,
hiveShim,
new JobConfWrapper(jobConf),
catalogTable.getPartitionKeys(),
getProducedTableSchema().getFieldDataTypes(),
getProducedTableSchema().getFieldNames(),
configuration,
defaultPartitionName);
final PartitionFetcher<HiveTablePartition> partitionFetcher;
// avoid lambda capture
final ObjectPath tableFullPath = tablePath;
if (catalogTable.getPartitionKeys().isEmpty()) {
// non-partitioned table, the fetcher fetches the partition which represents the given
// table.
partitionFetcher =
context -> {
List<HiveTablePartition> partValueList = new ArrayList<>();
partValueList.add(
context.getPartition(new ArrayList<>())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Fetch partition fail for hive table %s.",
tableFullPath))));
return partValueList;
};
} else if (isStreamingSource()) {
// streaming-read partitioned table, the fetcher fetches the latest partition of the
// given table.
partitionFetcher =
context -> {
List<HiveTablePartition> partValueList = new ArrayList<>();
List<PartitionFetcher.Context.ComparablePartitionValue>
comparablePartitionValues =
context.getComparablePartitionValueList();
// fetch latest partitions for partitioned table
if (comparablePartitionValues.size() > 0) {
// sort in desc order
comparablePartitionValues.sort(
(o1, o2) -> o2.getComparator().compareTo(o1.getComparator()));
PartitionFetcher.Context.ComparablePartitionValue maxPartition =
comparablePartitionValues.get(0);
partValueList.add(
context.getPartition(
(List<String>) maxPartition.getPartitionValue())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Fetch partition fail for hive table %s.",
tableFullPath))));
} else {
throw new IllegalArgumentException(
String.format(
"At least one partition is required when set '%s' to 'latest' in temporal join,"
+ " but actual partition number is '%s' for hive table %s",
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
comparablePartitionValues.size(),
tableFullPath));
}
return partValueList;
};
} else {
// bounded-read partitioned table, the fetcher fetches all partitions of the given
// filesystem table.
partitionFetcher =
context -> {
List<HiveTablePartition> partValueList = new ArrayList<>();
List<PartitionFetcher.Context.ComparablePartitionValue>
comparablePartitionValues =
context.getComparablePartitionValueList();
for (PartitionFetcher.Context.ComparablePartitionValue
comparablePartitionValue : comparablePartitionValues) {
partValueList.add(
context.getPartition(
(List<String>)
comparablePartitionValue
.getPartitionValue())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"Fetch partition fail for hive table %s.",
tableFullPath))));
}
return partValueList;
};
}
PartitionReader<HiveTablePartition, RowData> partitionReader =
new HiveInputFormatPartitionReader(
jobConf,
hiveVersion,
tablePath,
getTableSchema().getFieldDataTypes(),
getTableSchema().getFieldNames(),
catalogTable.getPartitionKeys(),
projectedFields,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
return new FileSystemLookupFunction<>(
partitionFetcher,
fetcherContext,
partitionReader,
(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
keys,
hiveTableReloadInterval);
}
/** PartitionFetcher.Context for {@link HiveTablePartition}. */
static class HiveTablePartitionFetcherContext
extends HivePartitionFetcherContextBase<HiveTablePartition> {
private static final long serialVersionUID = 1L;
public HiveTablePartitionFetcherContext(
ObjectPath tablePath,
HiveShim hiveShim,
JobConfWrapper confWrapper,
List<String> partitionKeys,
DataType[] fieldTypes,
String[] fieldNames,
Configuration configuration,
String defaultPartitionName) {
super(
tablePath,
hiveShim,
confWrapper,
partitionKeys,
fieldTypes,
fieldNames,
configuration,
defaultPartitionName);
}
@Override
public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
Preconditions.checkArgument(
partitionKeys.size() == partValues.size(),
String.format(
"The partition keys length should equal to partition values length, "
+ "but partition keys length is %s and partition values length is %s",
partitionKeys.size(), partValues.size()));
if (partitionKeys.isEmpty()) {
// get partition that represents the non-partitioned table.
return Optional.of(new HiveTablePartition(tableSd, tableProps));
} else {
try {
Partition partition =
metaStoreClient.getPartition(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
partValues);
HiveTablePartition hiveTablePartition =
HivePartitionUtils.toHiveTablePartition(
partitionKeys, tableProps, partition);
return Optional.of(hiveTablePartition);
} catch (NoSuchObjectException e) {
return Optional.empty();
}
}
}
}
}