blob: 29ce724629a73575208f34a575bc91b92fd352df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.mock;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.mock.MockTableDef.MockColumn;
import org.apache.drill.exec.store.mock.MockTableDef.MockScanEntry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
/**
* Describes a "group" scan of a (logical) mock table. The mock table has a
* schema described by the {@link MockScanEntry}. Class. To simulate a scan that
* can be parallelized, this group scan can contain a list of
* {@link MockScanEntry}, each of which simulates a separate file on disk, or
* block within a file. Each will give rise to a separate minor fragment
* (assuming sufficient parallelization.)
*/
@JsonTypeName("mock-scan")
public class MockGroupScanPOP extends AbstractGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
.getLogger(MockGroupScanPOP.class);
/**
* URL for the scan. Unused. Appears to be a vestige of an earlier design that
* required them.
*/
private final String url;
/**
* The set of simulated files to scan.
*/
protected final List<MockScanEntry> readEntries;
private LinkedList<MockScanEntry>[] mappings;
/**
* Whether this group scan uses a newer "extended" schema definition, or the
* original (non-extended) definition.
*/
private boolean extended;
private ScanStats scanStats = ScanStats.TRIVIAL_TABLE;
@JsonCreator
public MockGroupScanPOP(@JsonProperty("url") String url,
@JsonProperty("entries") List<MockScanEntry> readEntries) {
super((String) null);
this.readEntries = readEntries;
this.url = url;
// Compute decent row-count stats for this mock data source so that
// the planner is "fooled" into thinking that this operator will do
// disk I/O.
double rowCount = 0;
int rowWidth = 0;
// Can have multiple "read entries" which simulate blocks or
// row groups.
for (MockScanEntry entry : readEntries) {
rowCount += entry.getRecords();
int groupRowWidth = 0;
if (entry.getTypes() == null) {
// If no columns, assume a row width.
groupRowWidth = 50;
} else {
// The normal case: we do have columns. Use them
// to compute the row width.
for (MockColumn col : entry.getTypes()) {
int colWidth = 0;
if (col.getWidthValue() == 0) {
// Fixed width columns
colWidth = TypeHelper.getSize(col.getMajorType());
} else {
// Variable width columns with a specified column
// width
colWidth = col.getWidthValue();
}
// Columns can repeat
colWidth *= col.getRepeatCount();
groupRowWidth += colWidth;
}
}
// Overall row width is the greatest group row width.
rowWidth = Math.max(rowWidth, groupRowWidth);
}
double dataSize = rowCount * rowWidth;
scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
rowCount,
DrillCostBase.BASE_CPU_COST * dataSize,
DrillCostBase.BYTE_DISK_READ_COST * dataSize);
}
@Override
public ScanStats getScanStats() {
return scanStats;
}
public String getUrl() {
return url;
}
@JsonProperty("entries")
public List<MockScanEntry> getReadEntries() {
return readEntries;
}
@SuppressWarnings("unchecked")
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
mappings = new LinkedList[endpoints.size()];
int i = 0;
for (MockScanEntry e : this.getReadEntries()) {
if (i == endpoints.size()) {
i -= endpoints.size();
}
LinkedList<MockScanEntry> entries = mappings[i];
if (entries == null) {
entries = new LinkedList<MockScanEntry>();
mappings[i] = entries;
}
entries.add(e);
i++;
}
}
@Override
public SubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < mappings.length : String.format(
"Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
mappings.length, minorFragmentId);
return new MockSubScanPOP(url, extended, mappings[minorFragmentId]);
}
@Override
public int getMaxParallelizationWidth() {
return readEntries.size();
}
@Override
@JsonIgnore
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MockGroupScanPOP(url, readEntries);
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
if (columns.isEmpty()) {
throw new IllegalArgumentException("No columns for mock scan");
}
List<MockColumn> mockCols = new ArrayList<>();
Pattern p = Pattern.compile("(\\w+)_([isdb])(\\d*)");
for (SchemaPath path : columns) {
String col = path.getLastSegment().getNameSegment().getPath();
if (SchemaPath.DYNAMIC_STAR.equals(col)) {
return this;
}
Matcher m = p.matcher(col);
if (!m.matches()) {
throw new IllegalArgumentException(
"Badly formatted mock column name: " + col);
}
@SuppressWarnings("unused")
String name = m.group(1);
String type = m.group(2);
String length = m.group(3);
int width = 10;
if (!length.isEmpty()) {
width = Integer.parseInt(length);
}
MinorType minorType;
switch (type) {
case "i":
minorType = MinorType.INT;
break;
case "s":
minorType = MinorType.VARCHAR;
break;
case "d":
minorType = MinorType.FLOAT8;
break;
case "b":
minorType = MinorType.BIT;
break;
default:
throw new IllegalArgumentException(
"Unsupported field type " + type + " for mock column " + col);
}
MockTableDef.MockColumn mockCol = new MockColumn(
col, minorType, DataMode.REQUIRED, width, 0, 0, null, 1, null);
mockCols.add(mockCol);
}
MockScanEntry entry = readEntries.get(0);
MockColumn types[] = new MockColumn[mockCols.size()];
mockCols.toArray(types);
MockScanEntry newEntry = new MockScanEntry(entry.records, true, 0, 1, types);
List<MockScanEntry> newEntries = new ArrayList<>();
newEntries.add(newEntry);
return new MockGroupScanPOP(url, newEntries);
}
@Override
public String getDigest() {
return toString();
}
@Override
public String toString() {
return "MockGroupScanPOP [url=" + url + ", readEntries=" + readEntries
+ "]";
}
@Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
}
}