blob: 79bbdeb327076db0b72a8adf585c11dee08050c5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF)
* under one or more contributor license agreements.
* ee the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
* See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.blobstore;
import java.nio.channels.ClosedByInterruptException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Is called periodically and updates the nimbus
* with blobs based on the state stored inside the zookeeper
* for a non leader nimbus trying to be in sync
* with the operations performed on the leader nimbus.
*/
public class LocalFsBlobStoreSynchronizer {
private static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStoreSynchronizer.class);
private CuratorFramework zkClient;
private Map<String, Object> conf;
private BlobStore blobStore;
private Set<String> blobStoreKeySet = new HashSet<String>();
private Set<String> zookeeperKeySet = new HashSet<String>();
private NimbusInfo nimbusInfo;
public LocalFsBlobStoreSynchronizer(BlobStore blobStore, Map<String, Object> conf) {
this.blobStore = blobStore;
this.conf = conf;
}
public void setNimbusInfo(NimbusInfo nimbusInfo) {
this.nimbusInfo = nimbusInfo;
}
public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
public Set<String> getBlobStoreKeySet() {
Set<String> keySet = new HashSet<String>();
keySet.addAll(blobStoreKeySet);
return keySet;
}
public void setBlobStoreKeySet(Set<String> blobStoreKeySet) {
this.blobStoreKeySet = blobStoreKeySet;
}
public Set<String> getZookeeperKeySet() {
Set<String> keySet = new HashSet<String>();
keySet.addAll(zookeeperKeySet);
return keySet;
}
public void setZookeeperKeySet(Set<String> zookeeperKeySet) {
this.zookeeperKeySet = zookeeperKeySet;
}
public synchronized void syncBlobs() {
try {
LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}", getBlobStoreKeySet(), getZookeeperKeySet());
deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet());
updateKeySetForBlobStore(getBlobStoreKeySet());
Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(),
keySetToDownload);
for (String key : keySetToDownload) {
try {
Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
// Removing self so as not to create a deadlock where a nimbus is trying to download a missing blob
// from itself
nimbusInfoSet.remove(this.nimbusInfo);
LOG.debug("syncBlobs, key: {}, nimbusInfoSet: {}", key, nimbusInfoSet);
if (BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
}
} catch (KeyNotFoundException e) {
LOG.debug("Detected deletion for the key {} while downloading - skipping download", key);
}
}
} catch (InterruptedException | ClosedByInterruptException exp) {
LOG.error("Interrupt Exception {}", exp);
} catch (Exception exp) {
throw new RuntimeException(exp);
}
}
public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> keySetBlobStore, Set<String> keySetZookeeper) throws Exception {
if (keySetBlobStore.removeAll(keySetZookeeper)
|| (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) {
LOG.debug("Key set to delete in blobstore {}", keySetBlobStore);
for (String key : keySetBlobStore) {
blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
}
}
}
// Update current key list inside the blobstore if the version changes
public void updateKeySetForBlobStore(Set<String> keySetBlobStore) {
try {
for (String key : keySetBlobStore) {
LOG.debug("updating blob");
BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, zkClient, key, nimbusInfo);
}
} catch (Exception exp) {
throw new RuntimeException(exp);
}
}
// Make a key list to download
public Set<String> getKeySetToDownload(Set<String> blobStoreKeySet, Set<String> zookeeperKeySet) {
zookeeperKeySet.removeAll(blobStoreKeySet);
LOG.debug("Key list to download {}", zookeeperKeySet);
return zookeeperKeySet;
}
}