blob: a0644fa8715db41916723a6ad3870ec3dcf3efb3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.blobstore;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.auth.Subject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.storm.Config;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.zookeeper.KeeperException;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.CuratorUtils;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.WrappedKeyNotFoundException;
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlobStoreUtils {
private static final String BLOBSTORE_SUBTREE = "/blobstore";
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
public static String getBlobStoreSubtree() {
return BLOBSTORE_SUBTREE;
}
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public static CuratorFramework createZKClient(Map<String, Object> conf, DaemonType type) {
@SuppressWarnings("unchecked")
List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
CuratorFramework zkClient = CuratorUtils.newCurator(conf, zkServers, port,
(String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo,
type.getDefaultZkAcls(conf));
zkClient.start();
return zkClient;
}
public static Subject getNimbusSubject() {
Subject subject = new Subject();
subject.getPrincipals().add(new NimbusPrincipal());
return subject;
}
// Normalize state
public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(String nimbusSeqNumberInfo) {
BlobKeySequenceInfo keySequenceInfo = new BlobKeySequenceInfo();
int lastIndex = nimbusSeqNumberInfo.lastIndexOf("-");
keySequenceInfo.setNimbusHostPort(nimbusSeqNumberInfo.substring(0, lastIndex));
keySequenceInfo.setSequenceNumber(nimbusSeqNumberInfo.substring(lastIndex + 1));
return keySequenceInfo;
}
// Check for latest sequence number of a key inside zookeeper and return nimbodes containing the latest sequence number
public static Set<NimbusInfo> getNimbodesWithLatestSequenceNumberOfBlob(CuratorFramework zkClient, String key) throws Exception {
List<String> stateInfoList;
try {
stateInfoList = zkClient.getChildren().forPath("/blobstore/" + key);
} catch (KeeperException.NoNodeException e) {
// there's a race condition with a delete: blobstore
// this should be thrown to the caller to indicate that the key is invalid now
throw new WrappedKeyNotFoundException(key);
}
Set<NimbusInfo> nimbusInfoSet = new HashSet<NimbusInfo>();
int latestSeqNumber = getLatestSequenceNumber(stateInfoList);
LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} version {}", stateInfoList, latestSeqNumber);
// Get the nimbodes with the latest version
for (String state : stateInfoList) {
BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state);
if (latestSeqNumber == Integer.parseInt(sequenceInfo.getSequenceNumber())) {
nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort()));
}
}
LOG.debug("nimbusInfoList {}", nimbusInfoSet);
return nimbusInfoSet;
}
// Get sequence number details from latest sequence number of the blob
public static int getLatestSequenceNumber(List<String> stateInfoList) {
int seqNumber = 0;
// Get latest sequence number of the blob present in the zookeeper --> possible to refactor this piece of code
for (String state : stateInfoList) {
BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state);
int currentSeqNumber = Integer.parseInt(sequenceInfo.getSequenceNumber());
if (seqNumber < currentSeqNumber) {
seqNumber = currentSeqNumber;
LOG.debug("Sequence Info {}", seqNumber);
}
}
LOG.debug("Latest Sequence Number {}", seqNumber);
return seqNumber;
}
// Download missing blobs from potential nimbodes
public static boolean downloadMissingBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
throws TTransportException {
ReadableBlobMeta rbm;
ClientBlobStore remoteBlobStore;
boolean isSuccess = false;
LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
for (NimbusInfo nimbusInfo : nimbusInfos) {
if (isSuccess) {
break;
}
LOG.debug("Download blob key: {}, NimbusInfo {}", key, nimbusInfo);
try (NimbusClient client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
rbm = client.getClient().getBlobMeta(key);
remoteBlobStore = new NimbusBlobStore();
remoteBlobStore.setClient(conf, client);
try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) {
blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject());
}
// if key already exists while creating the blob else update it
Iterator<String> keyIterator = blobStore.listKeys();
while (keyIterator.hasNext()) {
if (keyIterator.next().equals(key)) {
LOG.debug("Success creating key, {}", key);
isSuccess = true;
break;
}
}
} catch (IOException | AuthorizationException exception) {
throw new RuntimeException(exception);
} catch (KeyAlreadyExistsException kae) {
LOG.info("KeyAlreadyExistsException Key: {} {}", key, kae);
} catch (KeyNotFoundException knf) {
// Catching and logging KeyNotFoundException because, if
// there is a subsequent update and delete, the non-leader
// nimbodes might throw an exception.
LOG.info("KeyNotFoundException Key: {} {}", key, knf);
} catch (Exception exp) {
// Logging an exception while client is connecting
LOG.error("Exception {}", exp);
}
}
if (!isSuccess) {
LOG.error("Could not download the blob with key: {}", key);
}
return isSuccess;
}
// Download updated blobs from potential nimbodes
public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
throws TTransportException {
ClientBlobStore remoteBlobStore;
AtomicOutputStream out = null;
boolean isSuccess = false;
LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
for (NimbusInfo nimbusInfo : nimbusInfos) {
if (isSuccess) {
break;
}
try (NimbusClient client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
remoteBlobStore = new NimbusBlobStore();
remoteBlobStore.setClient(conf, client);
try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) {
out = blobStore.updateBlob(key, getNimbusSubject());
byte[] buffer = new byte[2048];
int len = 0;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.close();
out = null;
}
isSuccess = true;
} catch (FileNotFoundException fnf) {
LOG.warn("Blobstore file for key '{}' does not exist or got deleted before it could be downloaded.", key, fnf);
} catch (IOException | AuthorizationException exception) {
throw new RuntimeException(exception);
} catch (KeyNotFoundException knf) {
// Catching and logging KeyNotFoundException because, if
// there is a subsequent update and delete, the non-leader
// nimbodes might throw an exception.
LOG.info("KeyNotFoundException", knf);
} catch (Exception exp) {
// Logging an exception while client is connecting
LOG.error("Exception", exp);
} finally {
if (out != null) {
try {
out.cancel();
} catch (IOException e) {
// Ignore.
}
}
}
}
if (!isSuccess) {
LOG.error("Could not update the blob with key: {}", key);
}
return isSuccess;
}
// Get the list of keys from blobstore
public static List<String> getKeyListFromBlobStore(BlobStore blobStore) throws Exception {
Iterator<String> keys = blobStore.listKeys();
List<String> keyList = new ArrayList<String>();
if (keys != null) {
while (keys.hasNext()) {
keyList.add(keys.next());
}
}
LOG.debug("KeyList from blobstore {}", keyList);
return keyList;
}
public static void createStateInZookeeper(Map<String, Object> conf, String key, NimbusInfo nimbusInfo) throws TTransportException {
ClientBlobStore cb = new NimbusBlobStore();
cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null));
cb.createStateInZookeeper(key);
}
public static void updateKeyForBlobStore(Map<String, Object> conf, BlobStore blobStore, CuratorFramework zkClient, String key,
NimbusInfo nimbusDetails) {
try {
// Most of clojure tests currently try to access the blobs using getBlob. Since, updateKeyForBlobStore
// checks for updating the correct version of the blob as a part of nimbus ha before performing any
// operation on it, there is a necessity to stub several test cases to ignore this method. It is a valid
// trade off to return if nimbusDetails which include the details of the current nimbus host port data are
// not initialized as a part of the test. Moreover, this applies to only local blobstore when used along with
// nimbus ha.
if (nimbusDetails == null) {
return;
}
boolean isListContainsCurrentNimbusInfo = false;
List<String> stateInfo;
if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
return;
}
stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
if (CollectionUtils.isEmpty(stateInfo)) {
return;
}
LOG.debug("StateInfo for update {}", stateInfo);
Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
for (NimbusInfo nimbusInfo : nimbusInfoList) {
if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) {
isListContainsCurrentNimbusInfo = true;
break;
}
}
if (!isListContainsCurrentNimbusInfo && downloadUpdatedBlob(conf, blobStore, key, nimbusInfoList)) {
LOG.debug("Updating state inside zookeeper for an update");
createStateInZookeeper(conf, key, nimbusDetails);
}
} catch (KeeperException.NoNodeException | KeyNotFoundException e) {
//race condition with a delete
return;
} catch (Exception exp) {
throw new RuntimeException(exp);
}
}
}