blob: 7ab4b4d6bc82d5ca797ebe677b3d0030c18fc19a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.openTSDB;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.openTSDB.OpenTSDBSubScan.OpenTSDBSubScanSpec;
import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl;
import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
@JsonTypeName("openTSDB-scan")
public class OpenTSDBGroupScan extends AbstractGroupScan {
private final OpenTSDBStoragePluginConfig storagePluginConfig;
private final OpenTSDBScanSpec openTSDBScanSpec;
private final OpenTSDBStoragePlugin storagePlugin;
private List<SchemaPath> columns;
@JsonCreator
public OpenTSDBGroupScan(@JsonProperty("openTSDBScanSpec") OpenTSDBScanSpec openTSDBScanSpec,
@JsonProperty("storage") OpenTSDBStoragePluginConfig openTSDBStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
this(pluginRegistry.resolve(openTSDBStoragePluginConfig, OpenTSDBStoragePlugin.class),
openTSDBScanSpec, columns);
}
public OpenTSDBGroupScan(OpenTSDBStoragePlugin storagePlugin,
OpenTSDBScanSpec scanSpec, List<SchemaPath> columns) {
super((String) null);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.openTSDBScanSpec = scanSpec;
this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
}
/**
* Private constructor, used for cloning.
*
* @param that The OpenTSDBGroupScan to clone
*/
private OpenTSDBGroupScan(OpenTSDBGroupScan that) {
super((String) null);
this.columns = that.columns;
this.openTSDBScanSpec = that.openTSDBScanSpec;
this.storagePlugin = that.storagePlugin;
this.storagePluginConfig = that.storagePluginConfig;
}
@Override
public int getMaxParallelizationWidth() {
return 1;
}
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
}
@Override
public OpenTSDBSubScan getSpecificScan(int minorFragmentId) {
List<OpenTSDBSubScanSpec> scanSpecList = Lists.newArrayList();
scanSpecList.add(new OpenTSDBSubScanSpec(getTableName()));
return new OpenTSDBSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns);
}
@Override
public ScanStats getScanStats() {
ServiceImpl client = storagePlugin.getClient();
Map<String, String> params = fromRowData(openTSDBScanSpec.getTableName());
Set<MetricDTO> allMetrics = client.getAllMetrics(params);
long numMetrics = allMetrics.size();
float approxDiskCost = 0;
if (numMetrics != 0) {
MetricDTO metricDTO = allMetrics.iterator().next();
// This method estimates the sizes of Java objects (number of bytes of memory they occupy).
// more detailed information about how this estimation method work you can find in this article
// http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
approxDiskCost = SizeEstimator.estimate(metricDTO) * numMetrics;
}
return new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, numMetrics, 1, approxDiskCost);
}
@Override
@JsonIgnore
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new OpenTSDBGroupScan(this);
}
@Override
public String getDigest() {
return toString();
}
@Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
}
@JsonIgnore
public String getTableName() {
return getOpenTSDBScanSpec().getTableName();
}
@JsonProperty
public OpenTSDBScanSpec getOpenTSDBScanSpec() {
return openTSDBScanSpec;
}
@JsonProperty("storage")
public OpenTSDBStoragePluginConfig getStoragePluginConfig() {
return storagePluginConfig;
}
@Override
@JsonProperty
public List<SchemaPath> getColumns() {
return columns;
}
@Override
public GroupScan clone(List<SchemaPath> columns) {
OpenTSDBGroupScan newScan = new OpenTSDBGroupScan(this);
newScan.columns = columns;
return newScan;
}
@Override
public String toString() {
return "OpenTSDBGroupScan [OpenTSDBScanSpec=" + openTSDBScanSpec + ", columns=" + columns
+ "]";
}
}