blob: f035709792dfa3a68917311b47232b719cf038bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.blobstore;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper
* for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus.
*/
public class BlobSynchronizer {
private static final Logger LOG = LoggerFactory.getLogger(BlobSynchronizer.class);
private CuratorFramework zkClient;
private Map conf;
private BlobStore blobStore;
private Set<String> blobStoreKeySet = new HashSet<String>();
private Set<String> zookeeperKeySet = new HashSet<String>();
private NimbusInfo nimbusInfo;
public BlobSynchronizer(BlobStore blobStore, Map conf) {
this.blobStore = blobStore;
this.conf = conf;
}
public void setNimbusInfo(NimbusInfo nimbusInfo) {
this.nimbusInfo = nimbusInfo;
}
public void setZookeeperKeySet(Set<String> zookeeperKeySet) {
this.zookeeperKeySet = zookeeperKeySet;
}
public void setBlobStoreKeySet(Set<String> blobStoreKeySet) {
this.blobStoreKeySet = blobStoreKeySet;
}
public Set<String> getBlobStoreKeySet() {
Set<String> keySet = new HashSet<String>();
keySet.addAll(blobStoreKeySet);
return keySet;
}
public Set<String> getZookeeperKeySet() {
Set<String> keySet = new HashSet<String>();
keySet.addAll(zookeeperKeySet);
return keySet;
}
public synchronized void syncBlobs() {
try {
LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet());
zkClient = BlobStoreUtils.createZKClient(conf);
deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet());
updateKeySetForBlobStore(getBlobStoreKeySet());
Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
for (String key : keySetToDownload) {
try {
Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
LOG.debug("syncBlobs, key: {}, nimbusInfoSet: {}", key, nimbusInfoSet);
if (BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
}
} catch (KeyNotFoundException e) {
LOG.debug("Detected deletion for the key {} while downloading - skipping download", key);
}
}
if (zkClient !=null) {
zkClient.close();
}
} catch(InterruptedException exp) {
LOG.error("InterruptedException {}", exp);
} catch(Exception exp) {
throw new RuntimeException(exp);
}
}
public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> keySetBlobStore, Set<String> keySetZookeeper) throws Exception {
if (keySetBlobStore.removeAll(keySetZookeeper)
|| (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) {
LOG.debug("Key set to delete in blobstore {}", keySetBlobStore);
for (String key : keySetBlobStore) {
blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
}
}
}
// Update current key list inside the blobstore if the version changes
public void updateKeySetForBlobStore(Set<String> keySetBlobStore) {
try {
for (String key : keySetBlobStore) {
LOG.debug("updating blob");
BlobStoreUtils.updateKeyForBlobStore(conf, blobStore, zkClient, key, nimbusInfo);
}
} catch (Exception exp) {
throw new RuntimeException(exp);
}
}
// Make a key list to download
public Set<String> getKeySetToDownload(Set<String> blobStoreKeySet, Set<String> zookeeperKeySet) {
zookeeperKeySet.removeAll(blobStoreKeySet);
LOG.debug("Key list to download {}", zookeeperKeySet);
return zookeeperKeySet;
}
}