blob: 39dc0733e1f9bd3a77b1b65b04c602a6f81f24ff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.BlockMapBuilder;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import org.apache.drill.exec.util.ImpersonationUtil;
@JsonTypeName("fs-scan")
public class EasyGroupScan extends AbstractFileGroupScan{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
private FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
private int maxWidth;
private List<SchemaPath> columns;
private ListMultimap<Integer, CompleteFileWork> mappings;
private List<CompleteFileWork> chunks;
private List<EndpointAffinity> endpointAffinities;
private String selectionRoot;
@JsonCreator
public EasyGroupScan(
@JsonProperty("userName") String userName,
@JsonProperty("files") List<String> files, //
@JsonProperty("storage") StoragePluginConfig storageConfig, //
@JsonProperty("format") FormatPluginConfig formatConfig, //
@JacksonInject StoragePluginRegistry engineRegistry, //
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
this(ImpersonationUtil.resolveUserName(userName),
new FileSelection(files, true),
(EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig),
columns,
selectionRoot);
}
public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
throws IOException {
this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
}
public EasyGroupScan(
String userName,
FileSelection selection, //
EasyFormatPlugin<?> formatPlugin, //
List<SchemaPath> columns,
String selectionRoot
) throws IOException{
super(userName);
this.selection = Preconditions.checkNotNull(selection);
this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
this.selectionRoot = selectionRoot;
initFromSelection(selection, formatPlugin);
}
private EasyGroupScan(EasyGroupScan that) {
super(that.getUserName());
selection = that.selection;
formatPlugin = that.formatPlugin;
columns = that.columns;
selectionRoot = that.selectionRoot;
chunks = that.chunks;
endpointAffinities = that.endpointAffinities;
maxWidth = that.maxWidth;
mappings = that.mappings;
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
this.selection = selection;
BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
this.maxWidth = chunks.size();
this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
public String getSelectionRoot() {
return selectionRoot;
}
@Override
public int getMaxParallelizationWidth() {
return maxWidth;
}
@Override
public ScanStats getScanStats() {
long data =0;
for (CompleteFileWork work : chunks) {
data += work.getTotalBytes();
}
long estRowCount = data/1024;
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
}
@JsonProperty("files")
public List<String> getFiles() {
return selection.getAsFiles();
}
@JsonProperty("columns")
public List<SchemaPath> getColumns() {
return columns;
}
@JsonIgnore
public FileSelection getFileSelection() {
return selection;
}
@Override
public void modifyFileSelection(FileSelection selection) {
this.selection = selection;
}
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
assert children == null || children.isEmpty();
return new EasyGroupScan(this);
}
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (endpointAffinities == null) {
logger.debug("chunks: {}", chunks.size());
endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
return endpointAffinities;
}
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks, formatPlugin.getContext());
}
private void createMappings(List<EndpointAffinity> affinities) {
List<DrillbitEndpoint> endpoints = Lists.newArrayList();
for (EndpointAffinity e : affinities) {
endpoints.add(e.getEndpoint());
}
this.applyAssignments(endpoints);
}
@Override
public EasySubScan getSpecificScan(int minorFragmentId) {
if (mappings == null) {
createMappings(this.endpointAffinities);
}
assert minorFragmentId < mappings.size() : String.format(
"Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
minorFragmentId);
List<CompleteFileWork> filesForMinor = mappings.get(minorFragmentId);
Preconditions.checkArgument(!filesForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
return new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
}
private List<FileWorkImpl> convert(List<CompleteFileWork> list) {
List<FileWorkImpl> newList = Lists.newArrayList();
for (CompleteFileWork f : list) {
newList.add(f.getAsFileWork());
}
return newList;
}
@JsonProperty("storage")
public StoragePluginConfig getStorageConfig() {
return formatPlugin.getStorageConfig();
}
@JsonProperty("format")
public FormatPluginConfig getFormatConfig() {
return formatPlugin.getConfig();
}
@Override
public String toString() {
final String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s]";
return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles());
}
@Override
public String getDigest() {
return toString();
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
if (!formatPlugin.supportsPushDown()) {
throw new IllegalStateException(String.format("%s doesn't support pushdown.", this.getClass().getSimpleName()));
}
EasyGroupScan newScan = new EasyGroupScan(this);
newScan.columns = columns;
return newScan;
}
@Override
public FileGroupScan clone(FileSelection selection) throws IOException {
EasyGroupScan newScan = new EasyGroupScan(this);
newScan.initFromSelection(selection, formatPlugin);
newScan.mappings = null; /* the mapping will be created later when we get specific scan
since the end-point affinities are not known at this time */
return newScan;
}
@Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return formatPlugin.supportsPushDown();
}
}