| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.blobstore; |
| |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.TreeSet; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.nimbus.NimbusInfo; |
| import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework; |
| import org.apache.storm.shade.org.apache.zookeeper.CreateMode; |
| import org.apache.storm.shade.org.apache.zookeeper.KeeperException; |
| import org.apache.storm.shade.org.apache.zookeeper.ZooDefs; |
| import org.apache.storm.utils.WrappedKeyNotFoundException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Class hands over the key sequence number which implies the number of updates made to a blob. |
| * The information regarding the keys and the sequence number which represents the number of updates are |
| * stored within the zookeeper in the following format. |
| * /storm/blobstore/key_name/nimbushostport-sequencenumber |
| * Example: |
| * If there are two nimbodes with nimbus.seeds:leader,non-leader are set, |
| * then the state inside the zookeeper is eventually stored as: |
| * /storm/blobstore/key1/leader:8080-1 |
| * /storm/blobstore/key1/non-leader:8080-1 |
| * indicates that a new blob with the name key1 has been created on the leader |
| * nimbus and the non-leader nimbus syncs after a call back is triggered by attempting |
| * to download the blob and finally updates its state inside the zookeeper. |
| * |
| * <p>A watch is placed on the /storm/blobstore/key1 and the znodes leader:8080-1 and |
| * non-leader:8080-1 are ephemeral which implies that these nodes exist only until the |
| * connection between the corresponding nimbus and the zookeeper persist. If in case the |
| * nimbus crashes the node disappears under /storm/blobstore/key1. |
| * |
| * <p>The sequence number for the keys are handed over based on the following scenario: |
| * Lets assume there are three nimbodes up and running, one being the leader and the other |
| * being the non-leader. |
| * |
| * <p>1. Create is straight forward. |
| * Check whether the znode -> /storm/blobstore/key1 has been created or not. It implies |
| * the blob has not been created yet. If not created, it creates it and updates the zookeeper |
| * states under /storm/blobstore/key1 and /storm/blobstoremaxkeysequencenumber/key1. |
| * The znodes it creates on these nodes are /storm/blobstore/key1/leader:8080-1, |
| * /storm/blobstore/key1/non-leader:8080-1 and /storm/blobstoremaxkeysequencenumber/key1/1. |
| * The latter holds the global sequence number across all nimbodes more like a static variable |
| * indicating the true value of number of updates for a blob. This node helps to maintain sanity in case |
| * leadership changes due to crashing. |
| * |
| * <p>2. Delete does not require to hand over the sequence number. |
| * |
| * <p>3. Finally, the update has few scenarios. |
| * |
| * <p>The class implements a TreeSet. The basic idea is if all the nimbodes have the same |
| * sequence number for the blob, then the number of elements in the set is 1 which holds |
| * the latest value of sequence number. If the number of elements are greater than 1 then it |
| * implies that there is sequence mismatch and there is need for syncing the blobs across |
| * nimbodes. |
| * |
| * <p>The logic for handing over sequence numbers based on the state are described as follows |
| * Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2. |
| * Scenario 1: |
| * Example: Normal create/update scenario |
| * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num |
| * Create-Key1 alive - Leader alive 1 1 |
| * Sync alive - Leader alive 1 1 (callback -> download) 1 |
| * Update-Key1 alive - Leader alive 2 1 2 |
| * Sync alive - Leader alive 2 2 (callback -> download) 2 |
| * |
| * <p>Scenario 2: |
| * Example: Leader nimbus crash followed by leader election, update and ex-leader restored again |
| * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num |
| * Create alive - Leader alive 1 1 |
| * Sync alive - Leader alive 1 1 (callback -> download) 1 |
| * Update alive - Leader alive 2 1 2 |
| * Sync alive - Leader alive 2 2 (callback -> download) 2 |
| * Update alive - Leader alive 3 2 3 |
| * Crash crash - Leader alive 3 2 3 |
| * New - Leader crash alive - Leader 3 (Invalid) 2 3 |
| * Update crash alive - Leader 3 (Invalid) 4 (max-seq-num + 1) 4 |
| * N1-Restored alive alive - Leader 0 4 4 |
| * Sync alive alive - Leader 4 4 4 |
| * |
| * <p>Scenario 3: |
| * Example: Leader nimbus crash followed by leader election, update and ex-leader restored again |
| * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1 Seq-Num-Nimbus-2 Max-Seq-Num |
| * Create alive - Leader alive 1 1 |
| * Sync alive - Leader alive 1 1 (callback -> download) 1 |
| * Update alive - Leader alive 2 1 2 |
| * Sync alive - Leader alive 2 2 (callback -> download) 2 |
| * Update alive - Leader alive 3 2 3 |
| * Crash crash - Leader alive 3 2 3 |
| * Elect Leader crash alive - Leader 3 (Invalid) 2 3 |
| * N1-Restored alive alive - Leader 3 2 3 |
| * Read/Update alive alive - Leader 3 4 (Downloads from N1) 4 |
| * Sync alive alive - Leader 4 (callback) 4 4 |
| * Here the download is triggered whenever an operation corresponding to the blob is triggered on the |
| * nimbus like a read or update operation. Here, in the read/update call it is hard to know which call |
| * is read or update. Hence, by incrementing the sequence number to max-seq-num + 1 we ensure that the |
| * synchronization happens appropriately and all nimbodes have the same blob. |
| */ |
| public class KeySequenceNumber { |
| private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class); |
| private static final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE = "/blobstoremaxkeysequencenumber"; |
| private final String key; |
| private final NimbusInfo nimbusInfo; |
| private static final int INT_CAPACITY = 4; |
| private static final int INITIAL_SEQUENCE_NUMBER = 1; |
| |
| public KeySequenceNumber(String key, NimbusInfo nimbusInfo) { |
| this.key = key; |
| this.nimbusInfo = nimbusInfo; |
| } |
| |
| public synchronized int getKeySequenceNumber(CuratorFramework zkClient) throws KeyNotFoundException { |
| TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); |
| try { |
| // Key has not been created yet and it is the first time it is being created |
| if (zkClient.checkExists().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key) == null) { |
| zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) |
| .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key); |
| zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, |
| ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array()); |
| return INITIAL_SEQUENCE_NUMBER; |
| } |
| |
| // When all nimbodes go down and one or few of them come up |
| // Unfortunately there might not be an exact way to know which one contains the most updated blob, |
| // if all go down which is unlikely. Hence there might be a need to update the blob if all go down. |
| List<String> stateInfoList = zkClient.getChildren().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key); |
| LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList); |
| if (stateInfoList.isEmpty()) { |
| return getMaxSequenceNumber(zkClient); |
| } |
| |
| LOG.debug("stateInfoSize {}", stateInfoList.size()); |
| // In all other cases check for the latest update sequence of the blob on the nimbus |
| // and assign the appropriate number. Check if all are have same sequence number, |
| // if not assign the highest sequence number. |
| for (String stateInfo : stateInfoList) { |
| sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo) |
| .getSequenceNumber())); |
| } |
| |
| // Update scenario 2 and 3 explain the code logic written here |
| // especially when nimbus crashes and comes up after and before update |
| // respectively. |
| int currentSeqNumber = getMaxSequenceNumber(zkClient); |
| if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, nimbusInfo) && !nimbusInfo.isLeader()) { |
| if (sequenceNumbers.last() < currentSeqNumber) { |
| return currentSeqNumber; |
| } else { |
| return INITIAL_SEQUENCE_NUMBER - 1; |
| } |
| } |
| |
| // It covers scenarios expalined in scenario 3 when nimbus-1 holding the latest |
| // update goes down before it is downloaded by nimbus-2. Nimbus-2 gets elected as a leader |
| // after which nimbus-1 comes back up and a read or update is performed. |
| if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, nimbusInfo) && nimbusInfo.isLeader()) { |
| incrementMaxSequenceNumber(zkClient, currentSeqNumber); |
| return currentSeqNumber + 1; |
| } |
| |
| // This code logic covers the update scenarios in 2 when the nimbus-1 goes down |
| // before syncing the blob to nimbus-2 and an update happens. |
| // If seq-num for nimbus-2 is 2 and max-seq-number is 3 then next sequence number is 4 |
| // (max-seq-number + 1). |
| // Other scenario it covers is when max-seq-number and nimbus seq number are equal. |
| if (sequenceNumbers.size() == 1) { |
| if (sequenceNumbers.first() < currentSeqNumber) { |
| incrementMaxSequenceNumber(zkClient, currentSeqNumber); |
| return currentSeqNumber + 1; |
| } else { |
| incrementMaxSequenceNumber(zkClient, currentSeqNumber); |
| return sequenceNumbers.first() + 1; |
| } |
| } |
| |
| // Normal create update sync scenario returns the greatest sequence number in the set |
| return sequenceNumbers.last(); |
| } catch (KeeperException.NoNodeException e) { |
| // there's a race condition with a delete: either blobstore or blobstoremaxsequence |
| // this should be thrown to the caller to indicate that the key is invalid now |
| throw new WrappedKeyNotFoundException(key); |
| } catch (Exception e) { |
| // in other case, just set this to 0 to trigger re-sync later |
| LOG.error("Exception {}", e); |
| return INITIAL_SEQUENCE_NUMBER - 1; |
| } |
| } |
| |
| private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList, NimbusInfo nimbusInfo) { |
| boolean containsNimbusHost = false; |
| for (String stateInfo : stateInfoList) { |
| if (stateInfo.contains(nimbusInfo.getHost())) { |
| containsNimbusHost = true; |
| break; |
| } |
| } |
| return containsNimbusHost; |
| } |
| |
| private void incrementMaxSequenceNumber(CuratorFramework zkClient, int count) throws Exception { |
| zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, |
| ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array()); |
| } |
| |
| private int getMaxSequenceNumber(CuratorFramework zkClient) throws Exception { |
| return ByteBuffer.wrap(zkClient.getData() |
| .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key)).getInt(); |
| } |
| } |