| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.store.shared; |
| |
| import java.io.Closeable; |
| import java.lang.invoke.MethodHandles; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.solr.cloud.CloudDescriptor; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.core.CoreContainer; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.store.blob.process.CorePuller; |
| import org.apache.solr.store.blob.process.CorePusher; |
| import org.apache.solr.store.shared.metadata.SharedShardMetadataController; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class is responsible for bringing a stale SHARED core upto date by pulling from the shared store at the start |
| * of an indexing batch and pushing the updated core at the end of a successfully committed indexing batch. |
| */ |
| public class SharedCoreIndexingBatchProcessor implements Closeable { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| /** |
| * Time indexing thread needs to wait to try acquiring pull write lock before checking if someone else has already done the pull. |
| */ |
| public static int SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK = 5; |
| /** |
| * Max attempts by indexing thread to try acquiring pull write lock before bailing out. Ideally bail out scenario should never happen. |
| * If it does then either we are too slow in pulling and can tune this value or something else is wrong. |
| */ |
| public static int MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK = 10; |
| |
| private final SolrCore core; |
| private final String collectionName; |
| private final String shardName; |
| private final String sharedShardName; |
| private final CorePusher corePusher; |
| private final CorePuller corePuller; |
| private IndexingBatchState state; |
| private ReentrantReadWriteLock corePullLock; |
| |
| public SharedCoreIndexingBatchProcessor(SolrCore core, ClusterState clusterState) { |
| this.core = core; |
| CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor(); |
| collectionName = cloudDescriptor.getCollectionName(); |
| shardName = cloudDescriptor.getShardId(); |
| |
| DocCollection collection = clusterState.getCollection(collectionName); |
| if (!collection.getSharedIndex()) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, collectionName + " is not a shared collection."); |
| } |
| |
| Slice shard = collection.getSlicesMap().get(shardName); |
| if (shard == null) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Indexing batch received for an unknown shard," + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName()); |
| } |
| |
| if (!Slice.State.ACTIVE.equals(shard.getState())) { |
| // This happens when we buffer updates for a sub shard. |
| // SHARED replica should eventually stop supporting buffered updates and then this should become a real exception |
| log.warn("Processing an indexing batch for a non-active shard," + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName()); |
| } |
| |
| sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME); |
| corePuller = new CorePuller(); |
| corePusher = new CorePusher(); |
| state = IndexingBatchState.NOT_STARTED; |
| } |
| |
| /** |
| * Should be called whenever a document is about to be added/deleted from the SHARED core. If it is the first doc |
| * of the core, this method will mark the start of an indexing batch and bring a stale SHARED core upto date by |
| * pulling from the shared store. |
| */ |
| public void addOrDeleteGoingToBeIndexedLocally() { |
| // Following logic is built on the assumption that one particular instance of this processor |
| // will solely be consumed by a single thread. And all the documents of indexing batch will be processed by this one instance. |
| String coreName = core.getName(); |
| if (IndexingBatchState.NOT_STARTED.equals(state)) { |
| startIndexingBatch(); |
| } else if (IndexingBatchState.STARTED.equals(state)) { |
| // do nothing, we only use this method to start an indexing batch once |
| } else if (IndexingBatchState.COMMITTED.equals(state)) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "Why are we adding/deleting a doc through an already committed indexing batch?" + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + coreName); |
| } else { |
| throwUnknownStateError(); |
| } |
| } |
| |
| @VisibleForTesting |
| protected void startIndexingBatch() { |
| // Following pull logic should only run once before the first add/delete of an indexing batch is processed by this processor |
| |
| assert IndexingBatchState.NOT_STARTED.equals(state); |
| |
| if (corePullLock != null) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "How come we already have a pull read lock?" + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName()); |
| } |
| |
| String coreName = core.getName(); |
| CoreContainer coreContainer = core.getCoreContainer(); |
| SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController(); |
| corePullLock = concurrencyController.getCorePullLock(collectionName, shardName, coreName); |
| // from this point onward we should always exit this method with read lock (no matter failure or what) |
| try { |
| concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.INDEXING_BATCH_RECEIVED); |
| state = IndexingBatchState.STARTED; |
| SharedCoreConcurrencyController.SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName); |
| /** |
| * we only need to sync if there is no soft guarantee of being in sync. |
| * if there is one we will rely on that, and if we turned out to be wrong indexing will fail at push time |
| * and will remove this guarantee in {@link CorePusher#pushCoreToBlob(PushPullData)} |
| */ |
| if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) { |
| SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController(); |
| SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName); |
| if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) { |
| acquireWriteLockAndPull(collectionName, shardName, coreName, coreContainer); |
| } |
| } |
| } finally { |
| // acquire lock for the whole duration of update |
| // we should always leave with read lock acquired(failure or success), since it is the job of close method to release it |
| corePullLock.readLock().lock(); |
| concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.LOCAL_INDEXING_STARTED); |
| } |
| } |
| |
| private void acquireWriteLockAndPull(String collectionName, String shardName, String coreName, CoreContainer coreContainer) { |
| // There is a likelihood that many indexing requests came at once and realized we are out of sync. |
| // They all would try to acquire write lock. One of them makes progress to pull from shared store. |
| // After that regular indexing will see soft guarantee of equality and moves straight to indexing |
| // under read lock. Now it is possible that new indexing keeps coming in and read lock is never free. |
| // In that case the poor guys that came in earlier and wanted to pull will still be struggling(starving) to |
| // acquire write lock. Since we know that write lock is only needed by one to do the work, we will |
| // try time boxed acquisition and in case of failed acquisition we will see if some one else has already completed the pull. |
| // We will make few attempts before we bail out. Ideally bail out scenario should never happen. |
| // If it does then either we are too slow in pulling and can tune following parameters or something else is wrong. |
| int attempt = 1; |
| while (true) { |
| SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController(); |
| try { |
| // try acquiring write lock |
| if (corePullLock.writeLock().tryLock(SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK, TimeUnit.SECONDS)) { |
| try { |
| // while acquiring write lock things might have updated, should reestablish if pull is still needed |
| SharedCoreConcurrencyController.SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName); |
| if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) { |
| SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController(); |
| SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName); |
| if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) { |
| getCorePuller().pullCoreFromSharedStore(core, sharedShardName, shardVersionMetadata, /* isLeaderPulling */true); |
| } |
| } |
| } finally { |
| corePullLock.writeLock().unlock(); |
| } |
| // write lock acquisition was successful and we are in sync with shared store |
| break; |
| } else { |
| // we could not acquire write lock but see if some other thread has already done the pulling |
| SharedCoreConcurrencyController.SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName); |
| if (coreVersionMetadata.isSoftGuaranteeOfEquality()) { |
| log.info("Indexing thread timed out trying to acquire the pull write lock. " + |
| "But some other thread has done the pulling so we are good. " + |
| "attempt=" + attempt + " collection=" + collectionName + " shard=" + shardName + " core=" + coreName); |
| break; |
| } |
| // no one else has pulled yet either, lets make another attempt ourselves |
| attempt++; |
| if (attempt > MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Indexing thread failed to acquire write lock for pull in " + |
| (SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK * MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK) + " seconds." + |
| " And no one other thread either has done the pull during that time. " + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + coreName); |
| } |
| } |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Indexing thread interrupted while trying to acquire pull write lock." + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + coreName, ie); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| protected CorePuller getCorePuller() { |
| return corePuller; |
| } |
| |
| /** |
| * Should be called after the SHARED core is successfully hard committed locally. This method will push the updated |
| * core to the shared store. If there was no local add/delete of a document for this processor then the push will be |
| * skipped. |
| */ |
| public void hardCommitCompletedLocally() { |
| finishIndexingBatch(); |
| } |
| |
| protected void finishIndexingBatch() { |
| String coreName = core.getName(); |
| if (IndexingBatchState.NOT_STARTED.equals(state)) { |
| // Since we did not added/deleted a single document therefore it is an isolated commit. |
| // Few ways isolated commit can manifest: |
| // 1. Client issuing a separate commit command after the update command. |
| // 2. SolrJ client issuing a separate follow up commit command to affected shards than actual indexing request |
| // even when SolrJ client's caller issued a single update command with commit=true. |
| // 3. The replica that received the indexing batch first was either a follower replica or the leader of another |
| // shard and only did the job of forwarding the docs to their rightful leader. Therefore, at the end it has |
| // nothing to commit. |
| // Shared replica has a hard requirement of processing each indexing batch with a hard commit(either explicit or |
| // implicit) because that is how, at the end of an indexing batch, synchronous push to shared store gets hold |
| // of the segment files on local disk. |
| // Therefore, isolated commits are meaningless for SHARED replicas and we can ignore writing to shared store. |
| // If we ever need an isolated commit to write to shared store for some scenario, we should first understand if a |
| // pull from shared store was done or not(why not) and push should happen under corePullLock.readLock() |
| state = IndexingBatchState.COMMITTED; |
| log.info("Isolated commit encountered for a SHARED replica, ignoring writing to shared store." + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + coreName); |
| } else if (IndexingBatchState.STARTED.equals(state)) { |
| if (corePullLock == null) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "How were we able to start an indexing batch without acquiring a pull read lock?" + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + coreName); |
| } |
| SharedCoreConcurrencyController concurrencyController = core.getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController(); |
| concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.LOCAL_INDEXING_FINISHED); |
| state = IndexingBatchState.COMMITTED; |
| getCorePusher().pushCoreToSharedStore(core, sharedShardName); |
| } else if (IndexingBatchState.COMMITTED.equals(state)) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, |
| "Why are we committing an already committed indexing batch?" + |
| " collection=" + collectionName + " shard=" + shardName + " core=" + coreName); |
| } else { |
| throwUnknownStateError(); |
| } |
| } |
| |
| private void throwUnknownStateError() { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Programmer's error, unknown IndexingBatchState" + state |
| + " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName()); |
| } |
| |
| @VisibleForTesting |
| protected CorePusher getCorePusher() { |
| return corePusher; |
| } |
| |
| @Override |
| public void close() { |
| if (!IndexingBatchState.NOT_STARTED.equals(state)) { |
| try { |
| SharedCoreConcurrencyController concurrencyController = |
| core.getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController(); |
| concurrencyController.recordState(collectionName, shardName, core.getName(), |
| SharedCoreConcurrencyController.SharedCoreStage.INDEXING_BATCH_FINISHED); |
| } catch (Exception ex) { |
| SolrException.log(log, "Error recording the finish of a SHARED core indexing batch", ex); |
| } |
| } |
| if (corePullLock != null) { |
| try { |
| // release read lock |
| corePullLock.readLock().unlock(); |
| } catch (Exception ex) { |
| SolrException.log(log, "Error releasing pull read lock of a SHARED core", ex); |
| } |
| } |
| } |
| |
| private enum IndexingBatchState { |
| NOT_STARTED, |
| STARTED, |
| COMMITTED |
| } |
| } |