blob: 646cc4051cf6efae46ee1fe74efe455f0700ab23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks;
import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class which provides a framework for a single segment conversion task that refreshes the existing segment.
* <p>This class handles segment download and upload.
* <p>To extends this base class, override the {@link #convert(PinotTaskConfig, File, File)} method to plug in the logic
* to convert the segment.
*/
public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingleSegmentConversionExecutor.class);
/**
* Converts the segment based on the given task config and returns the conversion result.
*/
protected abstract SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
throws Exception;
@Override
public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig)
throws Exception {
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
String downloadURL = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
String originalSegmentCrc = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String authToken = configs.get(MinionConstants.AUTH_TOKEN);
long currentSegmentCrc = getSegmentCrc(tableNameWithType, segmentName);
if (Long.parseLong(originalSegmentCrc) != currentSegmentCrc) {
LOGGER.info("Segment CRC does not match, skip the task. Original CRC: {}, current CRC: {}", originalSegmentCrc,
currentSegmentCrc);
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.build();
}
LOGGER.info("Start executing {} on table: {}, segment: {} with downloadURL: {}, uploadURL: {}", taskType,
tableNameWithType, segmentName, downloadURL, uploadURL);
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs(), "Failed to create temporary directory: %s", tempDataDir);
String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
try {
// Download the tarred segment file
File tarredSegmentFile = new File(tempDataDir, "tarredSegment");
LOGGER.info("Downloading segment from {} to {}", downloadURL, tarredSegmentFile.getAbsolutePath());
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURL, tarredSegmentFile, crypterName);
// Un-tar the segment file
File segmentDir = new File(tempDataDir, "segmentDir");
File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0);
if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath());
}
// Convert the segment
File workingDir = new File(tempDataDir, "workingDir");
Preconditions.checkState(workingDir.mkdir());
SegmentConversionResult segmentConversionResult = convert(pinotTaskConfig, indexDir, workingDir);
Preconditions.checkState(segmentConversionResult.getSegmentName().equals(segmentName),
"Converted segment name: %s does not match original segment name: %s",
segmentConversionResult.getSegmentName(), segmentName);
// Tar the converted segment
File convertedSegmentDir = segmentConversionResult.getFile();
File convertedTarredSegmentFile =
new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedTarredSegmentFile);
if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
}
// Delete the input segment after tarring the converted segment to avoid deleting the converted segment when the
// conversion happens in-place (converted segment dir is the same as input segment dir). It could also happen when
// the conversion is not required, and the input segment dir is returned as the result.
if (indexDir.exists() && !FileUtils.deleteQuietly(indexDir)) {
LOGGER.warn("Failed to delete input segment: {}", indexDir.getAbsolutePath());
}
// Check whether the task get cancelled before uploading the segment
if (_cancelled) {
LOGGER.info("{} on table: {}, segment: {} got cancelled", taskType, tableNameWithType, segmentName);
throw new TaskCancelledException(
taskType + " on table: " + tableNameWithType + ", segment: " + segmentName + " got cancelled");
}
// Set original segment CRC into HTTP IF-MATCH header to check whether the original segment get refreshed, so that
// the newer segment won't get override
Header ifMatchHeader = new BasicHeader(HttpHeaders.IF_MATCH, originalSegmentCrc);
// Only upload segment if it exists
Header refreshOnlyHeader = new BasicHeader(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY, "true");
// Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
// NOTE: even segment is not changed, still need to upload the segment to update the segment ZK metadata so that
// segment will not be submitted again
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult);
Header segmentZKMetadataCustomMapModifierHeader =
new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
segmentZKMetadataCustomMapModifier.toJsonString());
List<Header> httpHeaders = new ArrayList<>();
httpHeaders.add(ifMatchHeader);
httpHeaders.add(refreshOnlyHeader);
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
// Set parameters for upload request.
NameValuePair enableParallelPushProtectionParameter =
new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true");
NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
TableNameBuilder.extractRawTableName(tableNameWithType));
List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
// Upload the tarred segment
SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, tableNameWithType, segmentName, uploadURL,
convertedTarredSegmentFile);
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
}
LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName);
return segmentConversionResult;
} finally {
FileUtils.deleteQuietly(tempDataDir);
}
}
}