blob: 78eb25d0ed495dfe097b6b477a50ec1abaa40eb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.physical.crud;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.query.udf.core.context.UDFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
protected final ZoneId zoneId;
protected Map<String, UDTFExecutor> columnName2Executor = new HashMap<>();
protected Map<Integer, UDTFExecutor> originalOutputColumnIndex2Executor = new HashMap<>();
protected List<String> datasetOutputColumnIndex2UdfColumnName = new ArrayList<>();
protected List<String> datasetOutputColumnIndex2RawQueryColumnName = new ArrayList<>();
protected Map<String, Integer> pathNameToReaderIndex;
public UDTFPlan(ZoneId zoneId) {
super();
this.zoneId = zoneId;
setOperatorType(Operator.OperatorType.UDTF);
}
@Override
public void constructUdfExecutors(List<UDFContext> udfContexts) {
for (int i = 0; i < udfContexts.size(); ++i) {
UDFContext context = udfContexts.get(i);
if (context == null) {
continue;
}
String columnName = context.getColumnName();
if (!columnName2Executor.containsKey(columnName)) {
UDTFExecutor executor = new UDTFExecutor(context, zoneId);
columnName2Executor.put(columnName, executor);
}
originalOutputColumnIndex2Executor.put(i, columnName2Executor.get(columnName));
}
}
@Override
public void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMB)
throws QueryProcessException {
Collection<UDTFExecutor> executors = columnName2Executor.values();
collectorMemoryBudgetInMB /= executors.size();
UDFRegistrationService.getInstance().acquireRegistrationLock();
// This statement must be surrounded by the registration lock.
UDFClassLoaderManager.getInstance().initializeUDFQuery(queryId);
try {
for (UDTFExecutor executor : executors) {
executor.beforeStart(queryId, collectorMemoryBudgetInMB);
}
} finally {
UDFRegistrationService.getInstance().releaseRegistrationLock();
}
}
@Override
public void finalizeUDFExecutors(long queryId) {
try {
for (UDTFExecutor executor : columnName2Executor.values()) {
executor.beforeDestroy();
}
} finally {
UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
}
}
public UDTFExecutor getExecutorByOriginalOutputColumnIndex(int originalOutputColumn) {
return originalOutputColumnIndex2Executor.get(originalOutputColumn);
}
public UDTFExecutor getExecutorByDataSetOutputColumnIndex(int datasetOutputIndex) {
return columnName2Executor.get(datasetOutputColumnIndex2UdfColumnName.get(datasetOutputIndex));
}
public String getRawQueryColumnNameByDatasetOutputColumnIndex(int datasetOutputIndex) {
return datasetOutputColumnIndex2RawQueryColumnName.get(datasetOutputIndex);
}
public boolean isUdfColumn(int datasetOutputIndex) {
return datasetOutputColumnIndex2UdfColumnName.get(datasetOutputIndex) != null;
}
public int getReaderIndex(String pathName) {
return pathNameToReaderIndex.get(pathName);
}
public void addUdfOutputColumn(String udfDatasetOutputColumn) {
datasetOutputColumnIndex2UdfColumnName.add(udfDatasetOutputColumn);
datasetOutputColumnIndex2RawQueryColumnName.add(null);
}
public void addRawQueryOutputColumn(String rawQueryOutputColumn) {
datasetOutputColumnIndex2UdfColumnName.add(null);
datasetOutputColumnIndex2RawQueryColumnName.add(rawQueryOutputColumn);
}
public void setPathNameToReaderIndex(Map<String, Integer> pathNameToReaderIndex) {
this.pathNameToReaderIndex = pathNameToReaderIndex;
}
@Override
public String getColumnForDisplay(String columnForReader, int pathIndex) {
if (paths.get(pathIndex) == null) {
return this.getExecutorByOriginalOutputColumnIndex(pathIndex).getContext().getColumnName();
}
return columnForReader;
}
@Override
public boolean isRawQuery() {
return false;
}
}