blob: 4beedd4fd0d0b0cbcd2e3a61b2a61656cb1b9313 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.iceberg;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.iceberg.format.IcebergFormatPlugin;
import org.apache.drill.exec.store.iceberg.plan.DrillExprToIcebergTranslator;
import org.apache.drill.exec.store.iceberg.snapshot.Snapshot;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.util.ImpersonationUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.ListMultimap;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopTables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@JsonTypeName("iceberg-scan")
@SuppressWarnings("unused")
public class IcebergGroupScan extends AbstractGroupScan {
private static final Logger logger = LoggerFactory.getLogger(IcebergGroupScan.class);
private final IcebergFormatPlugin formatPlugin;
private final String path;
private final TupleMetadata schema;
private final LogicalExpression condition;
private final DrillFileSystem fs;
private final List<SchemaPath> columns;
private int maxRecords;
private List<IcebergCompleteWork> chunks;
private TableScan tableScan;
private List<EndpointAffinity> endpointAffinities;
private ListMultimap<Integer, IcebergCompleteWork> mappings;
@JsonCreator
public IcebergGroupScan(
@JsonProperty("userName") String userName,
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JsonProperty("format") FormatPluginConfig formatConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("schema") TupleMetadata schema,
@JsonProperty("path") String path,
@JsonProperty("condition") LogicalExpression condition,
@JsonProperty("maxRecords") Integer maxRecords,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
this(builder()
.userName(userName)
.formatPlugin(pluginRegistry.resolveFormat(storageConfig, formatConfig, IcebergFormatPlugin.class))
.schema(schema)
.path(path)
.condition(condition)
.columns(columns)
.maxRecords(maxRecords));
}
private IcebergGroupScan(IcebergGroupScanBuilder builder) throws IOException {
super(builder.userName);
this.formatPlugin = builder.formatPlugin;
this.columns = builder.columns;
this.path = builder.path;
this.schema = builder.schema;
this.condition = builder.condition;
this.maxRecords = builder.maxRecords;
this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
this.tableScan = initTableScan(formatPlugin, path, condition);
init();
}
public static TableScan initTableScan(IcebergFormatPlugin formatPlugin, String path, LogicalExpression condition) {
TableScan tableScan = new HadoopTables(formatPlugin.getFsConf()).load(path).newScan();
Map<String, String> properties = formatPlugin.getConfig().getProperties();
if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
tableScan = tableScan.option(entry.getKey(), entry.getValue());
}
}
if (condition != null) {
Expression expression = condition.accept(
DrillExprToIcebergTranslator.INSTANCE, null);
tableScan = tableScan.filter(expression);
}
Snapshot snapshot = formatPlugin.getConfig().getSnapshot();
if (snapshot != null) {
tableScan = snapshot.apply(tableScan);
}
Boolean caseSensitive = formatPlugin.getConfig().getCaseSensitive();
if (caseSensitive != null) {
tableScan = tableScan.caseSensitive(caseSensitive);
}
Boolean includeColumnStats = formatPlugin.getConfig().getIncludeColumnStats();
if (includeColumnStats != null && includeColumnStats) {
tableScan = tableScan.includeColumnStats();
}
Boolean ignoreResiduals = formatPlugin.getConfig().getIgnoreResiduals();
if (ignoreResiduals != null && ignoreResiduals) {
tableScan = tableScan.ignoreResiduals();
}
return tableScan;
}
/**
* Private constructor, used for cloning.
*
* @param that The IcebergGroupScan to clone
*/
private IcebergGroupScan(IcebergGroupScan that) {
super(that);
this.columns = that.columns;
this.formatPlugin = that.formatPlugin;
this.path = that.path;
this.condition = that.condition;
this.schema = that.schema;
this.mappings = that.mappings;
this.fs = that.fs;
this.maxRecords = that.maxRecords;
this.chunks = that.chunks;
this.tableScan = that.tableScan;
this.endpointAffinities = that.endpointAffinities;
}
public static IcebergGroupScanBuilder builder() {
return new IcebergGroupScanBuilder();
}
@Override
public IcebergGroupScan clone(List<SchemaPath> columns) {
try {
return toBuilder().columns(columns).build();
} catch (IOException e) {
throw new DrillRuntimeException(e);
}
}
@Override
public IcebergGroupScan applyLimit(int maxRecords) {
IcebergGroupScan clone = new IcebergGroupScan(this);
clone.maxRecords = maxRecords;
return clone;
}
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
mappings = AssignmentCreator.getMappings(endpoints, chunks);
}
private void createMappings(List<EndpointAffinity> affinities) {
List<DrillbitEndpoint> endpoints = affinities.stream()
.map(EndpointAffinity::getEndpoint)
.collect(Collectors.toList());
applyAssignments(endpoints);
}
@Override
public IcebergSubScan getSpecificScan(int minorFragmentId) {
if (mappings == null) {
createMappings(endpointAffinities);
}
assert minorFragmentId < mappings.size() : String.format(
"Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
minorFragmentId);
List<IcebergCompleteWork> workList = mappings.get(minorFragmentId);
Preconditions.checkArgument(!workList.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
IcebergSubScan subScan = IcebergSubScan.builder()
.userName(userName)
.formatPlugin(formatPlugin)
.columns(columns)
.condition(condition)
.schema(schema)
.workList(convertWorkList(workList))
.tableScan(tableScan)
.path(path)
.maxRecords(maxRecords)
.build();
subScan.setOperatorId(getOperatorId());
return subScan;
}
private List<IcebergWork> convertWorkList(List<IcebergCompleteWork> workList) {
return workList.stream()
.map(IcebergCompleteWork::getScanTask)
.map(IcebergWork::new)
.collect(Collectors.toList());
}
@JsonIgnore
public TableScan getTableScan() {
return tableScan;
}
@JsonProperty("maxRecords")
public int getMaxRecords() {
return maxRecords;
}
@Override
public int getMaxParallelizationWidth() {
return chunks.size();
}
@Override
public String getDigest() {
return toString();
}
@Override
public ScanStats getScanStats() {
int expectedRecordsPerChunk = 1_000_000;
if (maxRecords >= 0) {
expectedRecordsPerChunk = Math.max(maxRecords, 1);
}
int estimatedRecords = chunks.size() * expectedRecordsPerChunk;
return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, estimatedRecords, 1, 0);
}
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new IcebergGroupScan(this);
}
private void init() throws IOException {
tableScan = projectColumns(tableScan, columns);
chunks = new IcebergBlockMapBuilder(fs, formatPlugin.getContext().getBits())
.generateFileWork(tableScan.planTasks());
endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
public static TableScan projectColumns(TableScan tableScan, List<SchemaPath> columns) {
boolean hasStar = columns.stream()
.anyMatch(SchemaPath::isDynamicStar);
if (!hasStar) {
List<String> projectColumns = columns.stream()
.map(IcebergGroupScan::getPath)
.collect(Collectors.toList());
return tableScan.select(projectColumns);
}
return tableScan;
}
public static String getPath(SchemaPath schemaPath) {
StringBuilder sb = new StringBuilder();
PathSegment segment = schemaPath.getRootSegment();
sb.append(segment.getNameSegment().getPath());
while ((segment = segment.getChild()) != null) {
sb.append('.')
.append(segment.isNamed()
? segment.getNameSegment().getPath()
: "element");
}
return sb.toString();
}
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (endpointAffinities == null) {
logger.debug("Chunks size: {}", chunks.size());
endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
return endpointAffinities;
}
@Override
public boolean supportsLimitPushdown() {
return false;
}
@Override
@JsonProperty("columns")
public List<SchemaPath> getColumns() {
return columns;
}
@JsonProperty("schema")
public TupleMetadata getSchema() {
return schema;
}
@JsonProperty("storage")
public StoragePluginConfig getStorageConfig() {
return formatPlugin.getStorageConfig();
}
@JsonProperty("format")
public FormatPluginConfig getFormatConfig() {
return formatPlugin.getConfig();
}
@JsonProperty("path")
public String getPath() {
return path;
}
@JsonProperty("condition")
public LogicalExpression getCondition() {
return condition;
}
@JsonIgnore
public IcebergFormatPlugin getFormatPlugin() {
return formatPlugin;
}
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("path", path)
.field("schema", schema)
.field("columns", columns)
.field("tableScan", tableScan)
.field("maxRecords", maxRecords)
.toString();
}
public IcebergGroupScanBuilder toBuilder() {
return new IcebergGroupScanBuilder()
.userName(this.userName)
.formatPlugin(this.formatPlugin)
.schema(this.schema)
.path(this.path)
.condition(this.condition)
.columns(this.columns)
.maxRecords(this.maxRecords);
}
public static class IcebergGroupScanBuilder {
private String userName;
private IcebergFormatPlugin formatPlugin;
private TupleMetadata schema;
private String path;
private LogicalExpression condition;
private List<SchemaPath> columns;
private int maxRecords;
public IcebergGroupScanBuilder userName(String userName) {
this.userName = userName;
return this;
}
public IcebergGroupScanBuilder formatPlugin(IcebergFormatPlugin formatPlugin) {
this.formatPlugin = formatPlugin;
return this;
}
public IcebergGroupScanBuilder schema(TupleMetadata schema) {
this.schema = schema;
return this;
}
public IcebergGroupScanBuilder path(String path) {
this.path = path;
return this;
}
public IcebergGroupScanBuilder condition(LogicalExpression condition) {
this.condition = condition;
return this;
}
public IcebergGroupScanBuilder columns(List<SchemaPath> columns) {
this.columns = columns;
return this;
}
public IcebergGroupScanBuilder maxRecords(int maxRecords) {
this.maxRecords = maxRecords;
return this;
}
public IcebergGroupScan build() throws IOException {
return new IcebergGroupScan(this);
}
}
}