blob: 02d5e2d2dcc907d73f99b8b8940a62747f6fee5f [file] [log] [blame]
package backtype.storm.codedistributor;
import backtype.storm.nimbus.NimbusInfo;
import backtype.storm.utils.ZookeeperAuthInfo;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static backtype.storm.Config.*;
import static backtype.storm.utils.Utils.downloadFromHost;
import static backtype.storm.utils.Utils.newCurator;
public class LocalFileSystemCodeDistributor implements ICodeDistributor {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
private CuratorFramework zkClient;
private Map conf;
@Override
public void prepare(Map conf) throws Exception {
this.conf = conf;
List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo);
zkClient.start();
}
@Override
public File upload(String dirPath, String topologyId) throws Exception {
ArrayList<File> files = new ArrayList<File>();
File destDir = new File(dirPath);
File[] localFiles = destDir.listFiles();
List<String> filePaths = new ArrayList<String>(3);
for (File file : localFiles) {
filePaths.add(file.getAbsolutePath());
}
File metaFile = new File(destDir, "storm-code-distributor.meta");
boolean isCreated = metaFile.createNewFile();
if (isCreated) {
FileUtils.writeLines(metaFile, filePaths);
} else {
LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
}
LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
return metaFile;
}
@Override
public List<File> download(String topologyid, File metafile) throws Exception {
List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
File destDir = metafile.getParentFile();
List<File> downloadedFiles = Lists.newArrayList();
for (String absolutePathOnRemote : FileUtils.readLines(metafile)) {
File localFile = new File(destDir, new File(absolutePathOnRemote).getName());
boolean isSuccess = false;
for (String hostAndPort : hostInfos) {
NimbusInfo nimbusInfo = NimbusInfo.parse(hostAndPort);
try {
LOG.info("Attempting to download meta file {} from remote {}", absolutePathOnRemote, nimbusInfo.toHostPortString());
downloadFromHost(conf, absolutePathOnRemote, localFile.getAbsolutePath(), nimbusInfo.getHost(), nimbusInfo.getPort());
downloadedFiles.add(localFile);
isSuccess = true;
break;
} catch (Exception e) {
LOG.error("download failed from {}:{}, will try another endpoint ", nimbusInfo.getHost(), nimbusInfo.getPort(), e);
}
}
if(!isSuccess) {
throw new RuntimeException("File " + absolutePathOnRemote +" could not be downloaded from any endpoint");
}
}
return downloadedFiles;
}
@Override
public short getReplicationCount(String topologyId) throws Exception {
return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
}
@Override
public void cleanup(String topologyid) throws IOException {
//no op.
}
@Override
public void close(Map conf) {
zkClient.close();
}
}