package org.apache.solr.filestore;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpDelete;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.filestore.PackageStoreAPI.MetaData;
import org.apache.solr.util.SimplePostTool;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
public class DistribPackageStore implements PackageStore {
static final long MAX_PKG_SIZE = Long.parseLong(System.getProperty("", String.valueOf(100 * 1024 * 1024)));
* This is where al the files in the package store are listed
static final String ZK_PACKAGESTORE = "/packagestore";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CoreContainer coreContainer;
private Map<String, FileInfo> tmpFiles = new ConcurrentHashMap<>();
private final Path solrhome;
public DistribPackageStore(CoreContainer coreContainer) {
this.coreContainer = coreContainer;
this.solrhome = Paths.get(this.coreContainer.getSolrHome());
public Path getRealpath(String path) {
return _getRealPath(path, solrhome);
private static Path _getRealPath(String path, Path solrHome) {
if (File.separatorChar == '\\') {
path = path.replace('/', File.separatorChar);
if (!path.isEmpty() && path.charAt(0) != File.separatorChar) {
path = File.separator + path;
return new File(solrHome +
File.separator + PackageStoreAPI.PACKAGESTORE_DIRECTORY + path).toPath();
class FileInfo {
final String path;
String metaPath;
ByteBuffer fileData, metaData;
FileInfo(String path) {
this.path = path;
ByteBuffer getFileData(boolean validate) throws IOException {
if (fileData == null) {
try (FileInputStream fis = new FileInputStream(getRealpath(path).toFile())) {
fileData = SimplePostTool.inputStreamToByteArray(fis);
return fileData;
public String getMetaPath() {
if (metaPath == null) {
metaPath = _getMetapath(path);
return metaPath;
private void persistToFile(ByteBuffer data, ByteBuffer meta) throws IOException {
synchronized (DistribPackageStore.this) {
this.metaData = meta;
this.fileData = data;
_persistToFile(solrhome, path, data, meta);
if (log.isInfoEnabled()) {"persisted a file {} and metadata. sizes {} {}", path, data.limit(), meta.limit());
public boolean exists(boolean validateContent, boolean fetchMissing) throws IOException {
File file = getRealpath(path).toFile();
if (!file.exists()) {
if (fetchMissing) {
return fetchFromAnyNode();
} else {
return false;
if (validateContent) {
MetaData metaData = readMetaData();
if (metaData == null) return false;
try (InputStream is = new FileInputStream(getRealpath(path).toFile())) {
if (!Objects.equals(DigestUtils.sha512Hex(is), metaData.sha512)) {
} else {
return true;
} catch (Exception e) {
throw new SolrException(SERVER_ERROR, "unable to parse metadata json file");
} else {
return true;
return false;
private void deleteFile() {
try {
IOUtils.deleteFilesIfExist(getRealpath(path), getRealpath(getMetaPath()));
} catch (IOException e) {
log.error("Unable to delete files: {}", path);
private boolean fetchFileFromNodeAndPersist(String fromNode) {"fetching a file {} from {} ", path, fromNode);
String url = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(fromNode);
if (url == null) throw new SolrException(BAD_REQUEST, "No such node");
String baseUrl = url.replace("/solr", "/api");
ByteBuffer metadata = null;
Map m = null;
try {
metadata = Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(),
baseUrl + "/node/files" + getMetaPath(),
Utils.newBytesConsumer((int) MAX_PKG_SIZE));
m = (Map) Utils.fromJSON(metadata.array(), metadata.arrayOffset(), metadata.limit());
} catch (SolrException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching metadata", e);
try {
ByteBuffer filedata = Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(),
baseUrl + "/node/files" + path,
Utils.newBytesConsumer((int) MAX_PKG_SIZE));
String sha512 = DigestUtils.sha512Hex(new ByteBufferInputStream(filedata));
String expected = (String) m.get("sha512");
if (!sha512.equals(expected)) {
throw new SolrException(SERVER_ERROR, "sha512 mismatch downloading : " + path + " from node : " + fromNode);
persistToFile(filedata, metadata);
return true;
} catch (SolrException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching data", e);
} catch (IOException ioe) {
throw new SolrException(SERVER_ERROR, "Error persisting file", ioe);
boolean fetchFromAnyNode() {
ArrayList<String> l = coreContainer.getPackageStoreAPI().shuffledNodes();
ZkStateReader stateReader = coreContainer.getZkController().getZkStateReader();
for (String liveNode : l) {
try {
String baseurl = stateReader.getBaseUrlForNodeName(liveNode);
String url = baseurl.replace("/solr", "/api");
String reqUrl = url + "/node/files" + path +
boolean nodeHasBlob = false;
Object nl = Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), reqUrl, Utils.JAVABINCONSUMER);
if (Utils.getObjectByPath(nl, false, Arrays.asList("files", path)) != null) {
nodeHasBlob = true;
if (nodeHasBlob) {
boolean success = fetchFileFromNodeAndPersist(liveNode);
if (success) return true;
} catch (Exception e) {
//it's OK for some nodes to fail
return false;
String getSimpleName() {
int idx = path.lastIndexOf("/");
if (idx == -1) return path;
return path.substring(idx + 1);
public Path realPath() {
return getRealpath(path);
MetaData readMetaData() throws IOException {
File file = getRealpath(getMetaPath()).toFile();
if (file.exists()) {
try (InputStream fis = new FileInputStream(file)) {
return new MetaData((Map) Utils.fromJSON(fis));
return null;
public FileDetails getDetails() {
FileType type = getType(path, false);
return new FileDetails() {
public MetaData getMetaData() {
try {
return readMetaData();
} catch (Exception e) {
throw new RuntimeException(e);
public Date getTimeStamp() {
return new Date(realPath().toFile().lastModified());
public boolean isDir() {
return type == FileType.DIRECTORY;
public long size() {
return realPath().toFile().length();
public void writeMap(EntryWriter ew) throws IOException {
MetaData metaData = readMetaData();
ew.put(CommonParams.NAME, getSimpleName());
if (type == FileType.DIRECTORY) {
ew.put("dir", true);
ew.put("size", size());
ew.put("timestamp", getTimeStamp());
if (metaData != null)
public void readData(Consumer<FileEntry> consumer) throws IOException {
MetaData meta = readMetaData();
try (InputStream is = new FileInputStream(realPath().toFile())) {
consumer.accept(new FileEntry(null, meta, path) {
public InputStream getInputStream() {
return is;
public void put(FileEntry entry) throws IOException {
FileInfo info = new FileInfo(entry.path);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Utils.writeJson(entry.getMetaData(), baos, true);
byte[] bytes = baos.toByteArray();
info.persistToFile(entry.buf, ByteBuffer.wrap(bytes, 0, bytes.length));
private void distribute(FileInfo info) {
try {
String dirName = info.path.substring(0, info.path.lastIndexOf('/'));
coreContainer.getZkController().getZkClient().makePath(ZK_PACKAGESTORE + dirName, false, true);
coreContainer.getZkController().getZkClient().create(ZK_PACKAGESTORE + info.path, info.getDetails().getMetaData().sha512.getBytes(UTF_8),
CreateMode.PERSISTENT, true);
} catch (Exception e) {
throw new SolrException(SERVER_ERROR, "Unable to create an entry in ZK", e);
tmpFiles.put(info.path, info);
List<String> nodes = coreContainer.getPackageStoreAPI().shuffledNodes();
int i = 0;
String myNodeName = coreContainer.getZkController().getNodeName();
try {
for (String node : nodes) {
String baseUrl = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
String url = baseUrl.replace("/solr", "/api") + "/node/files" + info.path + "?getFrom=";
if (i < FETCHFROM_SRC) {
// this is to protect very large clusters from overwhelming a single node
// the first FETCHFROM_SRC nodes will be asked to fetch from this node.
// it's there in the memory now. So , it must be served fast
url += myNodeName;
} else {
if (i == FETCHFROM_SRC) {
// This is just an optimization
// at this point a bunch of nodes are already downloading from me
// I'll wait for them to finish before asking other nodes to download from each other
try {
Thread.sleep(2 * 1000);
} catch (Exception e) {
// trying to avoid the thundering herd problem when there are a very large no:of nodes
// others should try to fetch it from any node where it is available. By now,
// almost FETCHFROM_SRC other nodes may have it
url += "*";
try {
//fire and forget
Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), url, null);
} catch (Exception e) {"Node: {} failed to respond for file fetch notification", node, e);
//ignore the exception
// some nodes may be down or not responding
} finally {
coreContainer.getUpdateShardHandler().getUpdateExecutor().submit(() -> {
try {
Thread.sleep(10 * 1000);
} finally {
return null;
public boolean fetch(String path, String from) {
if (path == null || path.isEmpty()) return false;
FileInfo f = new FileInfo(path);
try {
if (f.exists(true, false)) {
return true;
} catch (IOException e) {
log.error("Error fetching file ", e);
return false;
if (from == null || "*".equals(from)) {"Missing file in package store: {}", path);
if (f.fetchFromAnyNode()) {"Successfully downloaded : {}", path);
return true;
} else {"Unable to download file : {}", path);
return false;
} else {
return false;
public void get(String path, Consumer<FileEntry> consumer, boolean fetchmissing) throws IOException {
File file = getRealpath(path).toFile();
String simpleName = file.getName();
if (isMetaDataFile(simpleName)) {
try (InputStream is = new FileInputStream(file)) {
consumer.accept(new FileEntry(null, null, path) {
//no metadata for metadata file
public InputStream getInputStream() {
return is;
new FileInfo(path).readData(consumer);
public void syncToAllNodes(String path) throws IOException {
FileInfo fi = new FileInfo(path);
if (!fi.exists(true, false)) {
throw new SolrException(BAD_REQUEST, "No such file : " + path);
public List<FileDetails> list(String path, Predicate<String> predicate) {
File file = getRealpath(path).toFile();
List<FileDetails> fileDetails = new ArrayList<>();
FileType type = getType(path, false);
if (type == FileType.DIRECTORY) {
file.list((dir, name) -> {
if (predicate == null || predicate.test(name)) {
if (!isMetaDataFile(name)) {
fileDetails.add(new FileInfo(path + "/" + name).getDetails());
return false;
} else if (type == FileType.FILE) {
fileDetails.add(new FileInfo(path).getDetails());
return fileDetails;
public void delete(String path) {
List<String> nodes = coreContainer.getPackageStoreAPI().shuffledNodes();
HttpClient client = coreContainer.getUpdateShardHandler().getDefaultHttpClient();
for (String node : nodes) {
String baseUrl = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
String url = baseUrl.replace("/solr", "/api") + "/node/files" + path;
HttpDelete del = new HttpDelete(url);
coreContainer.runAsync(() -> Utils.executeHttpMethod(client, url, null, del));//invoke delete command on all nodes asynchronously
private void checkInZk(String path) {
try {
//fail if file exists
if (coreContainer.getZkController().getZkClient().exists(ZK_PACKAGESTORE + path, true)) {
throw new SolrException(BAD_REQUEST, "The path exist ZK, delete and retry");
} catch (SolrException se) {
throw se;
} catch (Exception e) {
log.error("Could not connect to ZK", e);
public void deleteLocal(String path) {
FileInfo f = new FileInfo(path);
public void refresh(String path) {
try {
List l = null;
try {
l = coreContainer.getZkController().getZkClient().getChildren(ZK_PACKAGESTORE + path, null, true);
} catch (KeeperException.NoNodeException e) {
// does not matter
if (l != null && !l.isEmpty()) {
List myFiles = list(path, s -> true);
for (Object f : l) {
if (!myFiles.contains(f)) {"{} does not exist locally, downloading.. ", f);
fetch(path + "/" + f.toString(), "*");
} catch (Exception e) {
log.error("Could not refresh files in {}", path, e);
public FileType getType(String path, boolean fetchMissing) {
File file = getRealpath(path).toFile();
if (!file.exists() && fetchMissing) {
if (fetch(path, null)) {
file = getRealpath(path).toFile();
return _getFileType(file);
public static FileType _getFileType(File file) {
if (!file.exists()) return FileType.NOFILE;
if (file.isDirectory()) return FileType.DIRECTORY;
return isMetaDataFile(file.getName()) ? FileType.METADATA : FileType.FILE;
public static boolean isMetaDataFile(String file) {
return file.charAt(0) == '.' && file.endsWith(".json");
private void ensurePackageStoreDir(Path solrHome) {
final File packageStoreDir = getPackageStoreDirPath(solrHome).toFile();
if (!packageStoreDir.exists()) {
try {
final boolean created = packageStoreDir.mkdirs();
if (!created) {
log.warn("Unable to create [{}] directory in SOLR_HOME [{}]. Features requiring this directory may fail.", packageStoreDir, solrHome);
} catch (Exception e) {
log.warn("Unable to create [{}] directory in SOLR_HOME [{}]. Features requiring this directory may fail.", packageStoreDir, solrHome, e);
public static Path getPackageStoreDirPath(Path solrHome) {
return solrHome.resolve(PackageStoreAPI.PACKAGESTORE_DIRECTORY);
private static String _getMetapath(String path) {
int idx = path.lastIndexOf('/');
return path.substring(0, idx + 1) + "." + path.substring(idx + 1) + ".json";
* Internal API
public static void _persistToFile(Path solrHome, String path, ByteBuffer data, ByteBuffer meta) throws IOException {
Path realpath = _getRealPath(path, solrHome);
File file = realpath.toFile();
File parent = file.getParentFile();
if (!parent.exists()) {
Map m = (Map) Utils.fromJSON(meta.array(), meta.arrayOffset(), meta.limit());
if (m == null || m.isEmpty()) {
throw new SolrException(SERVER_ERROR, "invalid metadata , discarding : " + path);
File metdataFile = _getRealPath(_getMetapath(path), solrHome).toFile();
try (FileOutputStream fos = new FileOutputStream(metdataFile)) {
fos.write(meta.array(), 0, meta.limit());
IOUtils.fsync(metdataFile.toPath(), false);
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(data.array(), 0, data.limit());
IOUtils.fsync(file.toPath(), false);
public Map<String, byte[]> getKeys() throws IOException {
return _getKeys(solrhome);
// reads local keys file
private static Map<String, byte[]> _getKeys(Path solrhome) throws IOException {
Map<String, byte[]> result = new HashMap<>();
Path keysDir = _getRealPath(PackageStoreAPI.KEYS_DIR, solrhome);
File[] keyFiles = keysDir.toFile().listFiles();
if (keyFiles == null) return result;
for (File keyFile : keyFiles) {
if (keyFile.isFile() && !isMetaDataFile(keyFile.getName())) {
try (InputStream fis = new FileInputStream(keyFile)) {
result.put(keyFile.getName(), SimplePostTool.inputStreamToByteArray(fis).array());
return result;
public static void deleteZKFileEntry(SolrZkClient client, String path) {
try {
client.delete(ZK_PACKAGESTORE + path, -1, true);
} catch (KeeperException | InterruptedException e) {
log.error("", e);