blob: 96ef51b5df5de6ff70479c6f07a34dc826f63df4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.JSONBaseStatisticsRecordWriter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.metastore.statistics.Statistic;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
public class StatisticsCollectorImpl extends JSONBaseStatisticsRecordWriter {
private final List<DrillStatsTable.ColumnStatistics> columnStatisticsList = new ArrayList<>();
private String nextField = null;
private DrillStatsTable.ColumnStatistics columnStatistics;
private LocalDate dirComputedTime = null;
private boolean errStatus = false;
@Override
public void startStatisticsRecord() {
columnStatistics = new DrillStatsTable.ColumnStatistics_v1();
}
@Override
public void endStatisticsRecord() {
columnStatisticsList.add(columnStatistics);
}
@Override
public boolean hasStatistics() {
return !columnStatisticsList.isEmpty();
}
public DrillStatsTable.TableStatistics getStatistics() {
return DrillStatsTable.generateDirectoryStructure(dirComputedTime.toString(),
columnStatisticsList);
}
public boolean hasErrors() {
return errStatus;
}
@Override
public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new BigIntJsonConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new IntJsonConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewDateConverter(int fieldId, String fieldName, FieldReader reader) {
return new DateJsonConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, FieldReader reader) {
return new VarCharJsonConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableBigIntConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableBigIntJsonConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableVarBinaryConverter(int fieldId, String fieldName, FieldReader reader) {
return new NullableVarBinaryJsonConverter(fieldId, fieldName, reader);
}
@Override
public FieldConverter getNewNullableFloat8Converter(int fieldId, String fieldName, FieldReader reader) {
return new NullableFloat8JsonConverter(fieldId, fieldName, reader);
}
public class BigIntJsonConverter extends FieldConverter {
public BigIntJsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
switch (fieldName) {
case Statistic.SCHEMA:
case Statistic.ROWCOUNT:
case Statistic.NNROWCOUNT:
case Statistic.NDV:
case Statistic.AVG_WIDTH:
case Statistic.SUM_DUPS:
nextField = fieldName;
}
}
@Override
public void writeField() throws IOException {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
DrillStatsTable.ColumnStatistics_v1 columnStatistics =
(DrillStatsTable.ColumnStatistics_v1) StatisticsCollectorImpl.this.columnStatistics;
switch (nextField) {
case Statistic.SCHEMA:
columnStatistics.setSchema(reader.readLong());
break;
case Statistic.ROWCOUNT:
columnStatistics.setCount(reader.readLong());
break;
case Statistic.NNROWCOUNT:
columnStatistics.setNonNullCount(reader.readLong());
break;
case Statistic.NDV:
columnStatistics.setNdv(reader.readLong());
break;
case Statistic.AVG_WIDTH:
columnStatistics.setAvgWidth(reader.readLong());
break;
case Statistic.SUM_DUPS:
// Ignore Count_Approx_Dups statistic
break;
}
}
@Override
public void endField() {
nextField = null;
}
}
public class IntJsonConverter extends FieldConverter {
public IntJsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
if (fieldName.equals(Statistic.COLTYPE)) {
nextField = fieldName;
}
}
@Override
public void writeField() throws IOException {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
if (nextField.equals(Statistic.COLTYPE)) {
// Do not write out the type
}
}
@Override
public void endField() {
nextField = null;
}
}
public class DateJsonConverter extends FieldConverter {
public DateJsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
if (fieldName.equals(Statistic.COMPUTED)) {
nextField = fieldName;
}
}
@Override
public void writeField() throws IOException {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
if (nextField.equals((Statistic.COMPUTED))) {
LocalDate computedTime = reader.readLocalDate();
if (dirComputedTime == null
|| computedTime.compareTo(dirComputedTime) > 0) {
dirComputedTime = computedTime;
}
}
}
@Override
public void endField() {
nextField = null;
}
}
public class VarCharJsonConverter extends FieldConverter {
public VarCharJsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
switch (fieldName) {
case Statistic.COLNAME:
case Statistic.COLTYPE:
nextField = fieldName;
}
}
@Override
public void writeField() throws IOException {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
switch (nextField) {
case Statistic.COLNAME:
// column name is escaped, so SchemaPath.parseFromString() should be used here
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setName(SchemaPath.parseFromString(reader.readText().toString()));
break;
case Statistic.COLTYPE:
TypeProtos.MajorType fieldType = DrillStatsTable.getMapper().readValue(reader.readText().toString(), TypeProtos.MajorType.class);
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setType(fieldType);
break;
}
}
@Override
public void endField() {
nextField = null;
}
}
public class NullableBigIntJsonConverter extends FieldConverter {
public NullableBigIntJsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
switch (fieldName) {
case Statistic.ROWCOUNT:
case Statistic.NNROWCOUNT:
case Statistic.NDV:
case Statistic.SUM_DUPS:
nextField = fieldName;
break;
}
}
}
@Override
public void writeField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
switch (nextField) {
case Statistic.ROWCOUNT:
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setCount(reader.readLong());
break;
case Statistic.NNROWCOUNT:
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setNonNullCount(reader.readLong());
break;
case Statistic.NDV:
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setNdv(reader.readLong());
break;
case Statistic.SUM_DUPS:
// Ignore Count_Approx_Dups statistic
break;
}
}
}
@Override
public void endField() {
nextField = null;
}
}
public class NullableVarBinaryJsonConverter extends FieldConverter {
public NullableVarBinaryJsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
switch (fieldName) {
case Statistic.HLL:
case Statistic.HLL_MERGE:
case Statistic.TDIGEST_MERGE:
nextField = fieldName;
break;
}
}
}
@Override
public void writeField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
switch (nextField) {
case Statistic.HLL:
case Statistic.HLL_MERGE:
// Do NOT write out the HLL output, since it is not used yet for computing statistics for a
// subset of partitions in the query OR for computing NDV with incremental statistics.
break;
case Statistic.TDIGEST_MERGE:
byte[] tdigest_bytearray = reader.readByteArray();
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).buildHistogram(tdigest_bytearray);
break;
}
}
}
@Override
public void endField() {
nextField = null;
}
}
public class NullableFloat8JsonConverter extends FieldConverter {
public NullableFloat8JsonConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void startField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
if (fieldName.equals(Statistic.AVG_WIDTH)) {
nextField = fieldName;
}
}
}
@Override
public void writeField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
if (nextField == null) {
errStatus = true;
throw new IOException("Statistics writer encountered unexpected field");
}
if (nextField.equals(Statistic.AVG_WIDTH)) {
((DrillStatsTable.ColumnStatistics_v1) columnStatistics).setAvgWidth(reader.readDouble());
}
}
}
@Override
public void endField() {
nextField = null;
}
}
}