blob: da6bebb6eed78e2cdc8906e00361f4f886c940f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.table.dynamic;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
import org.apache.flink.connectors.kudu.format.KuduRowDataInputFormat;
import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
import org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupFunction;
import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.*;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly;
/**
* A {@link DynamicTableSource} for Kudu.
*/
public class KuduDynamicTableSource implements ScanTableSource, SupportsProjectionPushDown,
SupportsLimitPushDown, LookupTableSource, SupportsFilterPushDown {
private static final Logger LOG = LoggerFactory.getLogger(KuduDynamicTableSource.class);
private final KuduTableInfo tableInfo;
private final KuduLookupOptions kuduLookupOptions;
private final KuduRowDataInputFormat kuduRowDataInputFormat;
private final transient List<KuduFilterInfo> predicates = Lists.newArrayList();
private KuduReaderConfig.Builder configBuilder;
private TableSchema physicalSchema;
private String[] projectedFields;
private transient List<ResolvedExpression> filters;
public KuduDynamicTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo,
TableSchema physicalSchema, String[] projectedFields,
KuduLookupOptions kuduLookupOptions) {
this.configBuilder = configBuilder;
this.tableInfo = tableInfo;
this.physicalSchema = physicalSchema;
this.projectedFields = projectedFields;
this.kuduRowDataInputFormat = new KuduRowDataInputFormat(configBuilder.build(),
new RowResultRowDataConvertor(), tableInfo,
predicates,
projectedFields == null ? null : Lists.newArrayList(projectedFields));
this.kuduLookupOptions = kuduLookupOptions;
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
int keysLen = context.getKeys().length;
String[] keyNames = new String[keysLen];
for (int i = 0; i < keyNames.length; ++i) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(innerKeyArr.length == 1, "Kudu only support non-nested look up keys");
keyNames[i] = this.physicalSchema.getFieldNames()[innerKeyArr[0]];
}
KuduRowDataLookupFunction rowDataLookupFunction = KuduRowDataLookupFunction.Builder.options()
.keyNames(keyNames)
.kuduReaderConfig(configBuilder.build())
.projectedFields(projectedFields)
.tableInfo(tableInfo)
.kuduLookupOptions(kuduLookupOptions)
.build();
return TableFunctionProvider.of(rowDataLookupFunction);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
if (CollectionUtils.isNotEmpty(this.filters)) {
for (ResolvedExpression filter : this.filters) {
Optional<KuduFilterInfo> kuduFilterInfo = KuduTableUtils.toKuduFilterInfo(filter);
if (kuduFilterInfo != null && kuduFilterInfo.isPresent()) {
this.predicates.add(kuduFilterInfo.get());
}
}
}
KuduRowDataInputFormat inputFormat = new KuduRowDataInputFormat(configBuilder.build(),
new RowResultRowDataConvertor(), tableInfo,
this.predicates,
projectedFields == null ? null : Lists.newArrayList(projectedFields));
return InputFormatProvider.of(inputFormat);
}
@Override
public DynamicTableSource copy() {
return new KuduDynamicTableSource(this.configBuilder, this.tableInfo, this.physicalSchema,
this.projectedFields, this.kuduLookupOptions);
}
@Override
public String asSummaryString() {
return "kudu";
}
@Override
public boolean supportsNestedProjection() {
// planner doesn't support nested projection push down yet.
return false;
}
@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
// parser projectFields
this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);
this.projectedFields = physicalSchema.getFieldNames();
}
private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
Preconditions.checkArgument(
containsPhysicalColumnsOnly(tableSchema),
"Projection is only supported for physical columns.");
TableSchema.Builder builder = TableSchema.builder();
FieldsDataType fields =
(FieldsDataType)
DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);
RowType topFields = (RowType) fields.getLogicalType();
for (int i = 0; i < topFields.getFieldCount(); i++) {
builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));
}
return builder.build();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KuduDynamicTableSource that = (KuduDynamicTableSource) o;
return Objects.equals(configBuilder, that.configBuilder) && Objects.equals(tableInfo, that.tableInfo) && Objects.equals(physicalSchema, that.physicalSchema) && Arrays.equals(projectedFields, that.projectedFields) && Objects.equals(kuduLookupOptions, that.kuduLookupOptions) && Objects.equals(kuduRowDataInputFormat, that.kuduRowDataInputFormat) && Objects.equals(filters, that.filters) && Objects.equals(predicates, that.predicates);
}
@Override
public int hashCode() {
int result = Objects.hash(configBuilder, tableInfo, physicalSchema,
kuduLookupOptions, kuduRowDataInputFormat, filters, predicates);
result = 31 * result + Arrays.hashCode(projectedFields);
return result;
}
@Override
public void applyLimit(long limit) {
this.configBuilder = this.configBuilder.setRowLimit((int) limit);
}
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
this.filters = filters;
return Result.of(Collections.emptyList(), filters);
}
}