blob: b3a7258b378f2a60ab10d7c084a7e594aa980900 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pinot.controller.api.upload;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The ZKOperator is a util class that is used during segment upload to set relevant metadata fields in zk. It will
* currently
* also perform the data move. In the future when we introduce versioning, we will decouple these two steps.
public class ZKOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(ZKOperator.class);
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final ControllerConf _controllerConf;
private final ControllerMetrics _controllerMetrics;
public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
ControllerMetrics controllerMetrics) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_controllerConf = controllerConf;
_controllerMetrics = controllerMetrics;
public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter)
throws Exception {
String segmentName = segmentMetadata.getName();
ZNRecord segmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
boolean refreshOnly =
if (segmentMetadataZNRecord == null) {
if (refreshOnly) {
throw new ControllerApplicationException(LOGGER,
"Cannot refresh non-existing segment, aborted uploading segment: " + segmentName + " of table: "
+ tableNameWithType, Response.Status.GONE);
}"Adding new segment {} from table {}", segmentName, tableNameWithType);
processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers,
crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation);
// TODO Allow segment refreshing for realtime tables.
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
throw new ControllerApplicationException(LOGGER,
"Refresh existing segment " + segmentName + " for realtime table " + tableNameWithType
+ " is not yet supported ", Response.Status.NOT_IMPLEMENTED);
}"Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType);
processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName,
segmentMetadataZNRecord, moveSegmentToFinalLocation);
private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord,
boolean moveSegmentToFinalLocation)
throws Exception {
SegmentZKMetadata existingSegmentZKMetadata = new SegmentZKMetadata(znRecord);
long existingCrc = existingSegmentZKMetadata.getCrc();
int expectedVersion = znRecord.getVersion();
// Check if CRC match when IF-MATCH header is set
checkCRC(headers, tableNameWithType, segmentName, existingCrc);
// Check segment upload start time when parallel push protection enabled
if (enableParallelPushProtection) {
// When segment upload start time is larger than 0, that means another upload is in progress
long segmentUploadStartTime = existingSegmentZKMetadata.getSegmentUploadStartTime();
if (segmentUploadStartTime > 0) {
if (System.currentTimeMillis() - segmentUploadStartTime > _controllerConf.getSegmentUploadTimeoutInMillis()) {
// Last segment upload does not finish properly, replace the segment
LOGGER.error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName,
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L);
} else {
// Another segment upload is in progress
throw new ControllerApplicationException(LOGGER,
"Another segment upload is in progress for segment: " + segmentName + " of table: " + tableNameWithType
+ ", retry later", Response.Status.CONFLICT);
// Lock the segment by setting the upload start time in ZK
if (!_pinotHelixResourceManager
.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, expectedVersion)) {
throw new ControllerApplicationException(LOGGER,
"Failed to lock the segment: " + segmentName + " of table: " + tableNameWithType + ", retry later",
} else {
// The version will increment if the zk metadata update is successful
// Reset segment upload start time to unlock the segment later
// NOTE: reset this value even if parallel push protection is not enabled so that segment can recover in case
// previous segment upload did not finish properly and the parallel push protection is turned off
try {
// Modify the custom map in segment ZK metadata
String segmentZKMetadataCustomMapModifierStr =
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier;
if (segmentZKMetadataCustomMapModifierStr != null) {
segmentZKMetadataCustomMapModifier =
new SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
} else {
// By default, use REPLACE modify mode
segmentZKMetadataCustomMapModifier =
new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, null);
// Update ZK metadata and refresh the segment if necessary
long newCrc = Long.valueOf(segmentMetadata.getCrc());
if (newCrc == existingCrc) {
"New segment crc '{}' is the same as existing segment crc for segment '{}'. Updating ZK metadata without "
+ "refreshing the segment.", newCrc, segmentName);
// NOTE: even though we don't need to refresh the segment, we should still update creation time and refresh time
// (creation time is not included in the crc)
// NOTE: in rare cases the segment can be deleted before the metadata is updated and the expected version won't
// match, we should fail the request for such cases
if (!_pinotHelixResourceManager
.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, expectedVersion)) {
throw new RuntimeException(
"Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
} else {
// New segment is different with the existing one, update ZK metadata and refresh the segment
"New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing "
+ "segment {}", newCrc, existingCrc, segmentName);
if (moveSegmentToFinalLocation) {
moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);"Moved segment {} from temp location {} to {}", segmentName,
currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
} else {"Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType,
.refreshSegment(tableNameWithType, segmentMetadata, existingSegmentZKMetadata, expectedVersion,
zkDownloadURI, crypter);
} catch (Exception e) {
if (!_pinotHelixResourceManager
.updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, expectedVersion)) {
LOGGER.error("Failed to update ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType);
throw e;
private void checkCRC(HttpHeaders headers, String offlineTableName, String segmentName, long existingCrc) {
String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
if (expectedCrcStr != null) {
long expectedCrc;
try {
expectedCrc = Long.parseLong(expectedCrcStr);
} catch (NumberFormatException e) {
throw new ControllerApplicationException(LOGGER,
"Caught exception for segment: " + segmentName + " of table: " + offlineTableName
+ " while parsing IF-MATCH CRC: \"" + expectedCrcStr + "\"", Response.Status.PRECONDITION_FAILED);
if (expectedCrc != existingCrc) {
throw new ControllerApplicationException(LOGGER,
"For segment: " + segmentName + " of table: " + offlineTableName + ", expected CRC: " + expectedCrc
+ " does not match existing CRC: " + existingCrc, Response.Status.PRECONDITION_FAILED);
private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType,
String segmentName, boolean moveSegmentToFinalLocation)
throws Exception {
// For v1 segment uploads, we will not move the segment
if (moveSegmentToFinalLocation) {
try {
moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
} catch (Exception e) {
.error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, e);
throw new RuntimeException(e);
} else {"Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType,
_pinotHelixResourceManager.addNewSegment(tableNameWithType, segmentMetadata, zkDownloadURI, crypter);
// Update zk metadata customer map
String segmentZKMetadataCustomMapModifierStr = headers != null ? headers
.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) : null;
if (segmentZKMetadataCustomMapModifierStr != null) {
ZNRecord segmentMetadataZnRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadataZnRecord);
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
new SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
if (!_pinotHelixResourceManager
.updateZkMetadata(tableNameWithType, segmentZKMetadata, segmentMetadataZnRecord.getVersion())) {
throw new RuntimeException(
"Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
private void moveSegmentToPermanentDirectory(File currentSegmentLocation, URI finalSegmentLocationURI)
throws Exception {
PinotFS pinotFS = PinotFSFactory.create(finalSegmentLocationURI.getScheme());
// Overwrites current segment file"Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(),
pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI);