blob: 5fc7c647669d773b674723499f97767d62234e09 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.converttorawindex;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@TaskGenerator
public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
private ClusterInfoAccessor _clusterInfoAccessor;
@Override
public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
}
@Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
}
@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
// Get the segments that are being converted so that we don't submit them again
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoAccessor);
for (TableConfig tableConfig : tableConfigs) {
// Only generate tasks for OFFLINE tables
String offlineTableName = tableConfig.getTableName();
if (tableConfig.getTableType() != TableType.OFFLINE) {
LOGGER.warn("Skip generating ConvertToRawIndexTask for non-OFFLINE table: {}", offlineTableName);
continue;
}
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
Preconditions.checkNotNull(tableTaskConfig);
Map<String, String> taskConfigs =
tableTaskConfig.getConfigsForTaskType(MinionConstants.ConvertToRawIndexTask.TASK_TYPE);
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName);
// Get max number of tasks for this table
int tableMaxNumTasks;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = Integer.MAX_VALUE;
}
} else {
tableMaxNumTasks = Integer.MAX_VALUE;
}
// Get the config for columns to convert
String columnsToConvertConfig = taskConfigs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY);
// Generate tasks
List<SegmentZKMetadata> offlineSegmentsZKMetadata = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(offlineTableName);
Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
for (SegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentsZKMetadata) {
preSelectedSegmentsBasedOnLineage.add(offlineSegmentZKMetadata.getSegmentName());
}
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
int tableNumTasks = 0;
for (SegmentZKMetadata segmentZKMetadata : offlineSegmentsZKMetadata) {
// Generate up to tableMaxNumTasks tasks each time for each table
if (tableNumTasks == tableMaxNumTasks) {
break;
}
// Skip segments that are already submitted
String segmentName = segmentZKMetadata.getSegmentName();
if (runningSegments.contains(new Segment(offlineTableName, segmentName))) {
continue;
}
// Skip segments based on lineage: for COMPLETED lineage, segments in `segmentsFrom` will be removed by
// retention manager, for IN_PROGRESS lineage, segments in `segmentsTo` are uploaded yet
if (!preSelectedSegmentsBasedOnLineage.contains(segmentName)) {
continue;
}
// Only submit segments that have not been converted
Map<String, String> customMap = segmentZKMetadata.getCustomMap();
if (customMap == null || !customMap.containsKey(
MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY + MinionConstants.TASK_TIME_SUFFIX)) {
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc()));
if (columnsToConvertConfig != null) {
configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, columnsToConvertConfig);
}
pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, configs));
tableNumTasks++;
}
}
}
return pinotTaskConfigs;
}
}