blob: 51c8c9ad6c050251af979c51650921a63286ef0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.store.blob.process;
import java.lang.invoke.MethodHandles;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.store.blob.client.BlobCoreMetadata;
import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
import org.apache.solr.store.blob.client.CoreStorageClient;
import org.apache.solr.store.blob.metadata.CorePushPull;
import org.apache.solr.store.blob.metadata.PushPullData;
import org.apache.solr.store.blob.metadata.ServerSideMetadata;
import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil;
import org.apache.solr.store.blob.util.BlobStoreUtils;
import org.apache.solr.store.shared.SharedCoreConcurrencyController;
import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class executes synchronous pulls of cores from the shared store.
*/
public class CorePuller {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Brings a local core up to date with the shard's index in the shared store.
*
* @param core core to be pulled
* @param sharedShardName identifier for the shard index data located on a shared store
* @param shardVersionMetadata metadata pointing to the version of shard's index in the shared store to be pulled
* @param isLeaderInitiated whether pull is requested by a leader replica or not
*/
public void pullCoreFromSharedStore(SolrCore core, String sharedShardName,
SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata,
boolean isLeaderInitiated) {
CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
String collectionName = cloudDescriptor.getCollectionName();
String shardName = cloudDescriptor.getShardId();
String coreName = core.getName();
try {
log.info("Initiating pull for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
CoreContainer coreContainer = core.getCoreContainer();
SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
if (SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(shardVersionMetadata.getMetadataSuffix())) {
//no-op pull
BlobCoreMetadata emptyBlobCoreMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedShardName);
concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, shardVersionMetadata, emptyBlobCoreMetadata, isLeaderInitiated);
log.info("Pull successful, nothing to pull, collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
return;
}
concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.BLOB_PULL_STARTED);
try {
// Get blob metadata
String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(shardVersionMetadata.getMetadataSuffix());
CoreStorageClient blobClient = coreContainer.getSharedStoreManager().getBlobStorageProvider().getClient();
BlobCoreMetadata blobCoreMetadata = blobClient.pullCoreMetadata(sharedShardName, blobCoreMetadataName);
if (null == blobCoreMetadata) {
// Zookepeer and blob are out of sync, could be due to eventual consistency model in blob or something else went wrong.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"core.metadata file needed for pull is missing from shared store, blobCoreMetadataName=" + blobCoreMetadataName +
" shard=" + shardName +
" collectionName=" + collectionName +
" sharedShardName=" + sharedShardName);
} else if (blobCoreMetadata.getIsDeleted()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"core.metadata file needed for pull is marked deleted in shared store, blobCoreMetadataName=" + blobCoreMetadataName +
" shard=" + shardName +
" collectionName=" + collectionName +
" sharedShardName=" + sharedShardName);
} else if (blobCoreMetadata.getIsCorrupt()) {
log.warn("core.metadata file needed for pull is marked corrupt, skipping sync, collection=" + collectionName +
" shard=" + shardName + " coreName=" + coreName + " sharedShardName=" + sharedShardName);
return;
}
// Get local metadata + resolve with blob metadata. Given we're doing a pull, don't need to reserve commit point
// We do need to compute a directory hash to verify after pulling or before switching index dirs that no local
// changes occurred concurrently
ServerSideMetadata serverMetadata = new ServerSideMetadata(coreName, coreContainer,
/* reserveCommit */ false, /* captureDirHash */ true);
SharedStoreResolutionUtil.SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobCoreMetadata);
PushPullData pushPullData = new PushPullData.Builder()
.setCollectionName(collectionName)
.setShardName(shardName)
.setCoreName(coreName)
.setSharedStoreName(sharedShardName)
.build();
if (resolutionResult.getFilesToPull().size() > 0) {
BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
CorePushPull cp = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, serverMetadata, blobCoreMetadata);
cp.pullUpdateFromBlob(/* waitForSearcher */ true);
concurrencyController.updateCoreVersionMetadata(pushPullData.getCollectionName(), pushPullData.getShardName(), pushPullData.getCoreName(),
shardVersionMetadata, blobCoreMetadata, isLeaderInitiated);
} else {
log.warn("Why there are no files to pull even when we do not match with the version in zk? " +
"collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
}
} finally {
concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.BLOB_PULL_FINISHED);
}
} catch (Exception ex) {
// wrap every thrown exception in a solr exception
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to pull from the shared store," +
" collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName, ex);
}
}
}