blob: 00d510080a4342eb88aaed735cfcd1086b23503a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.openTSDB;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
import org.apache.drill.exec.store.openTSDB.client.Schema;
import org.apache.drill.exec.store.openTSDB.client.Service;
import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
public class OpenTSDBRecordReader extends AbstractRecordReader {
private static final Logger logger = LoggerFactory.getLogger(OpenTSDBRecordReader.class);
// batch size should not exceed max allowed record count
private static final int TARGET_RECORD_COUNT = 4000;
private static final Map<OpenTSDBTypes, MinorType> TYPES;
private Service db;
private Iterator<MetricDTO> tableIterator;
private OutputMutator output;
private ImmutableList<ProjectedColumnInfo> projectedCols;
private Map<String, String> params;
public OpenTSDBRecordReader(Service client, OpenTSDBSubScan.OpenTSDBSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns) {
setColumns(projectedColumns);
this.db = client;
this.params =
fromRowData(subScanSpec.getTableName());
logger.debug("Scan spec: {}", subScanSpec);
}
@Override
public void setup(OperatorContext context, OutputMutator output) {
this.output = output;
Set<MetricDTO> metrics =
db.getAllMetrics(params);
if (metrics == null) {
throw UserException.validationError()
.message(String.format("Table '%s' not found", params.get(METRIC_PARAM)))
.build(logger);
}
this.tableIterator = metrics.iterator();
}
@Override
public int next() {
try {
return processOpenTSDBTablesData();
} catch (SchemaChangeException e) {
throw new DrillRuntimeException(e);
}
}
@Override
public void close() {
}
static {
TYPES = ImmutableMap.<OpenTSDBTypes, MinorType>builder()
.put(OpenTSDBTypes.STRING, MinorType.VARCHAR)
.put(OpenTSDBTypes.DOUBLE, MinorType.FLOAT8)
.put(OpenTSDBTypes.TIMESTAMP, MinorType.TIMESTAMP)
.build();
}
private static class ProjectedColumnInfo {
ValueVector vv;
ColumnDTO openTSDBColumn;
}
private int processOpenTSDBTablesData() throws SchemaChangeException {
int rowCounter = 0;
while (tableIterator.hasNext() && rowCounter < TARGET_RECORD_COUNT) {
MetricDTO metricDTO = tableIterator.next();
rowCounter = addRowResult(metricDTO, rowCounter);
}
return rowCounter;
}
private int addRowResult(MetricDTO table, int rowCounter) throws SchemaChangeException {
setupProjectedColsIfItNull();
for (String time : table.getDps().keySet()) {
String value = table.getDps().get(time);
setupDataToDrillTable(table, time, value, table.getTags(), rowCounter);
rowCounter++;
}
return rowCounter;
}
private void setupProjectedColsIfItNull() throws SchemaChangeException {
if (projectedCols == null) {
initCols(new Schema(db, params.get(METRIC_PARAM)));
}
}
private void setupDataToDrillTable(MetricDTO table, String timestamp, String value, Map<String, String> tags, int rowCount) {
for (ProjectedColumnInfo pci : projectedCols) {
switch (pci.openTSDBColumn.getColumnName()) {
case "metric":
setStringColumnValue(table.getMetric(), pci, rowCount);
break;
case "aggregate tags":
setStringColumnValue(table.getAggregateTags().toString(), pci, rowCount);
break;
case "timestamp":
setTimestampColumnValue(timestamp, pci, rowCount);
break;
case "aggregated value":
setDoubleColumnValue(value, pci, rowCount);
break;
default:
setStringColumnValue(tags.get(pci.openTSDBColumn.getColumnName()), pci, rowCount);
}
}
}
private void setTimestampColumnValue(String timestamp, ProjectedColumnInfo pci, int rowCount) {
setTimestampColumnValue(timestamp != null ? Long.parseLong(timestamp) : Long.parseLong("0"), pci, rowCount);
}
private void setDoubleColumnValue(String value, ProjectedColumnInfo pci, int rowCount) {
setDoubleColumnValue(value != null ? Double.parseDouble(value) : 0.0, pci, rowCount);
}
private void setStringColumnValue(String data, ProjectedColumnInfo pci, int rowCount) {
if (data == null) {
data = "null";
}
ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
((NullableVarCharVector.Mutator) pci.vv.getMutator())
.setSafe(rowCount, value, 0, value.remaining());
}
private void setTimestampColumnValue(Long data, ProjectedColumnInfo pci, int rowCount) {
((NullableTimeStampVector.Mutator) pci.vv.getMutator())
.setSafe(rowCount, data * 1000);
}
private void setDoubleColumnValue(Double data, ProjectedColumnInfo pci, int rowCount) {
((NullableFloat8Vector.Mutator) pci.vv.getMutator())
.setSafe(rowCount, data);
}
private void initCols(Schema schema) throws SchemaChangeException {
ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
for (int i = 0; i < schema.getColumnCount(); i++) {
ColumnDTO column = schema.getColumnByIndex(i);
final String name = column.getColumnName();
final OpenTSDBTypes type = column.getColumnType();
TypeProtos.MinorType minorType = TYPES.get(type);
if (isMinorTypeNull(minorType)) {
String message = String.format(
"A column you queried has a data type that is not currently supported by the OpenTSDB storage plugin. "
+ "The column's name was %s and its OpenTSDB data type was %s. ", name, type.toString());
throw UserException.unsupportedError()
.message(message)
.build(logger);
}
ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType);
pciBuilder.add(pci);
}
projectedCols = pciBuilder.build();
}
private boolean isMinorTypeNull(MinorType minorType) {
return minorType == null;
}
private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, String name, MinorType minorType) throws SchemaChangeException {
MajorType majorType = getMajorType(minorType);
MaterializedField field =
MaterializedField.create(name, majorType);
ValueVector vector =
getValueVector(minorType, majorType, field);
return getProjectedColumnInfo(column, vector);
}
private MajorType getMajorType(MinorType minorType) {
MajorType majorType;
majorType = Types.optional(minorType);
return majorType;
}
private ValueVector getValueVector(MinorType minorType, MajorType majorType, MaterializedField field) throws SchemaChangeException {
final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
minorType, majorType.getMode());
ValueVector vector = output.addField(field, clazz);
vector.allocateNew();
return vector;
}
private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, ValueVector vector) {
ProjectedColumnInfo pci = new ProjectedColumnInfo();
pci.vv = vector;
pci.openTSDBColumn = column;
return pci;
}
}