blob: b70ded9f063536b705263277cbc71ed52f25b98d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.impl;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
import org.apache.iotdb.commons.schema.tree.SchemaIterator;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.visitor.CompleteMeasurementSchemaVisitor;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.ISchemaReader;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.filter.FilterContainsVisitor;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.filter.TimeseriesFilterVisitor;
import org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor;
import org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
public class TimeseriesReaderWithViewFetch implements ISchemaReader<ITimeSeriesSchemaInfo> {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeseriesReaderWithViewFetch.class);
private final SchemaIterator<ITimeSeriesSchemaInfo> iterator;
private final Queue<ITimeSeriesSchemaInfo> cachedViewList = new ArrayDeque<>();
private ITimeSeriesSchemaInfo next = null;
private boolean consumeView = false;
private final SchemaFilter schemaFilter;
/**
* There is no need to pull the original sequence information from the view if needFetch is false.
* The default is false if not filtered by DataType.
*/
private final boolean needFetch;
/**
* If isBlocked is null, it means the next is not fetched yet. If isBlocked.isDone() is false, it
* means the next is being fetched. If isBlocked.get() is true, it means hasNext, otherwise, it
* means no more info to be fetched.
*/
private ListenableFuture<Boolean> isBlocked = null;
private static final int BATCH_CACHED_SIZE = 1000;
private static final TimeseriesFilterVisitor FILTER_VISITOR = new TimeseriesFilterVisitor();
public TimeseriesReaderWithViewFetch(
SchemaIterator<ITimeSeriesSchemaInfo> iterator, SchemaFilter schemaFilter) {
this.iterator = iterator;
this.schemaFilter = schemaFilter;
this.needFetch = new FilterContainsVisitor().process(schemaFilter, SchemaFilterType.DATA_TYPE);
}
public TimeseriesReaderWithViewFetch(
SchemaIterator<ITimeSeriesSchemaInfo> iterator,
SchemaFilter schemaFilter,
boolean needViewDetail) {
this.iterator = iterator;
this.schemaFilter = schemaFilter;
this.needFetch =
needViewDetail
|| new FilterContainsVisitor().process(schemaFilter, SchemaFilterType.DATA_TYPE);
}
@Override
public boolean isSuccess() {
return iterator.isSuccess();
}
@Override
public Throwable getFailure() {
return iterator.getFailure();
}
@Override
public void close() {
iterator.close();
}
/**
* Fetch ITimeSeriesSchemaInfo from the iterator and return only in the following three cases
*
* <ol>
* <li>successfully fetched an info of normal time series. consumeView is false and next is not
* null.
* <li>successfully fetched batch info of view time series. consumeView is true and next is
* null.
* <li>no more info to be fetched. consumeView is false and next is null.
* </ol>
*/
@Override
public ListenableFuture<Boolean> isBlocked() {
if (isBlocked != null) {
return isBlocked;
}
ListenableFuture<Boolean> res = NOT_BLOCKED_FALSE;
if (consumeView) {
// consume view list
res = NOT_BLOCKED_TRUE;
} else if (next == null) {
// get next from iterator
ITimeSeriesSchemaInfo temp;
while (iterator.hasNext()) {
temp = iterator.next();
if (needFetch && temp.isLogicalView()) {
// view timeseries
cachedViewList.add(temp.snapshot());
if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
res = asyncGetNext();
break;
}
} else if (FILTER_VISITOR.process(schemaFilter, temp)) {
// normal timeseries
next = temp;
res = NOT_BLOCKED_TRUE;
break;
}
}
if (res == NOT_BLOCKED_FALSE && !cachedViewList.isEmpty()) {
// all schema info has been fetched, but there mau be still some view schema info in
// cachedViewList
res = asyncGetNext();
}
} else {
// next is not null
res = NOT_BLOCKED_TRUE;
}
isBlocked = res;
return res;
}
@SuppressWarnings("java:S2142")
@Override
public boolean hasNext() {
try {
return isBlocked().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public ITimeSeriesSchemaInfo next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
ITimeSeriesSchemaInfo result;
if (!consumeView) {
result = next;
next = null;
} else {
// it may return null if cachedViewList is empty but consumeView is true
result = cachedViewList.poll();
consumeView = !cachedViewList.isEmpty();
}
isBlocked = null;
return result;
}
private ListenableFuture<Boolean> asyncGetNext() {
// enter this function only when viewList is full or all schema info has been fetched and
// viewList is not empty
return Futures.submit(
() -> {
fetchViewTimeSeriesSchemaInfo();
if (consumeView) {
return true;
} else {
// all cache view is no satisfied
while (iterator.hasNext()) {
ITimeSeriesSchemaInfo temp = iterator.next();
if (temp.isLogicalView()) {
cachedViewList.add(temp.snapshot());
if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
fetchViewTimeSeriesSchemaInfo();
if (consumeView) {
return true;
}
}
} else if (FILTER_VISITOR.process(schemaFilter, temp)) {
next = temp;
return true;
}
}
return false;
}
},
FragmentInstanceManager.getInstance().getIntoOperationExecutor());
}
private void fetchViewTimeSeriesSchemaInfo() {
List<ITimeSeriesSchemaInfo> delayedLogicalViewList = new ArrayList<>();
List<ViewExpression> viewExpressionList = new ArrayList<>();
GetSourcePathsVisitor getSourcePathsVisitor = new GetSourcePathsVisitor();
List<PartialPath> sourcePathsNeedFetch;
PathPatternTree patternTree = new PathPatternTree();
for (ITimeSeriesSchemaInfo series : cachedViewList) {
delayedLogicalViewList.add(series);
ViewExpression viewExpression = ((LogicalViewSchema) series.getSchema()).getExpression();
viewExpressionList.add(((LogicalViewSchema) series.getSchema()).getExpression());
sourcePathsNeedFetch = getSourcePathsVisitor.process(viewExpression, null);
for (PartialPath path : sourcePathsNeedFetch) {
patternTree.appendFullPath(path);
}
}
// clear cachedViewList, all cached view will be added in the last step
cachedViewList.clear();
ISchemaTree schemaTree =
ClusterSchemaFetcher.getInstance().fetchSchema(patternTree, true, null);
// process each view expression and get data type
TransformToExpressionVisitor transformToExpressionVisitor = new TransformToExpressionVisitor();
CompleteMeasurementSchemaVisitor completeMeasurementSchemaVisitor =
new CompleteMeasurementSchemaVisitor();
Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
for (int i = 0; i < delayedLogicalViewList.size(); i++) {
ViewExpression viewExpression = viewExpressionList.get(i);
Expression expression = null;
boolean viewIsBroken = false;
try {
expression = transformToExpressionVisitor.process(viewExpression, null);
expression = completeMeasurementSchemaVisitor.process(expression, schemaTree);
ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, expression);
} catch (Exception e) {
viewIsBroken = true;
}
delayedLogicalViewList
.get(i)
.getSchema()
.setType(viewIsBroken ? TSDataType.UNKNOWN : expressionTypes.get(NodeRef.of(expression)));
if (FILTER_VISITOR.process(schemaFilter, delayedLogicalViewList.get(i))) {
cachedViewList.add(delayedLogicalViewList.get(i));
}
}
consumeView = !cachedViewList.isEmpty();
}
}