blob: 7d6a73be6cec77ea23dfabb153cd014e29089f90 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.solr.core;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.http.client.HttpClient;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.api.CallInfo;
import org.apache.solr.api.Command;
import org.apache.solr.api.EndPoint;
import org.apache.solr.api.V2HttpCall;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.MapWriter.EMPTY;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.common.params.CommonParams.PACKAGE;
import static org.apache.solr.core.BlobRepository.sha256Digest;
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
* This class represents the new P2P, File System Store.
* This identifies a file by its sha256 + filename.
* This acts as a server for files for a user or any other node to
* download a file.
* This also is responsible for distributing a file across the nodes
* in the cluster.
public class DistribFileStore {
static final long MAX_PKG_SIZE = Long.parseLong(System.getProperty("max.package.size", String.valueOf(100 * 1024 * 1024)));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CoreContainer coreContainer;
private Map<String, ByteBuffer> tmpFiles = new ConcurrentHashMap<>();
final FileStoreRead fileStoreRead = new FileStoreRead();
public DistribFileStore(CoreContainer coreContainer) {
this.coreContainer = coreContainer;
public MapWriter fileList(SolrParams params) {
String id = params.get(CommonParams.ID);
File dir = getFileStorePath().toFile();
String fromNode = params.get("fromNode");
if (id != null && fromNode != null) {
//asking to fetch it from somewhere if it does not exist locally
if (!new File(dir, id).exists()) {
if ("*".equals(fromNode)) {
//asking to fetch from a random node
return EMPTY;
} else { // asking to fetch from a specific node
fetchFileFromNodeAndPersist(id, fromNode);
return MapWriter.EMPTY;
return ew -> dir.listFiles((f, name) -> {
if (id == null || name.equals(id)) {
ew.putNoEx(name, (MapWriter) ew1 -> {
File file = new File(f, name);
ew1.put("size", file.length());
ew1.put("timestamp", new Date(file.lastModified()));
return false;
public Path getFileStorePath() {
Path blobsDirPath = SolrResourceLoader.getFileStoreDirPath(this.coreContainer.getResourceLoader().getInstancePath());
return new File(blobsDirPath.toFile(), PACKAGE).toPath();
private ByteBuffer fetchFromOtherNodes(String id) {
ByteBuffer[] result = new ByteBuffer[1];
ArrayList<String> l = shuffledNodes();
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add(CommonParams.ID, id);
ZkStateReader stateReader = coreContainer.getZkController().getZkStateReader();
for (String liveNode : l) {
try {
String baseurl = stateReader.getBaseUrlForNodeName(liveNode);
String url = baseurl.replace("/solr", "/api");
String reqUrl = url + "/node/filestore/package?wt=javabin&omitHeader=true&id=" + id;
boolean nodeHasBlob = false;
Object nl = Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), reqUrl, Utils.JAVABINCONSUMER);
if (Utils.getObjectByPath(nl, false, Arrays.asList("blob", id)) != null) {
nodeHasBlob = true;
if (nodeHasBlob) {
result[0] = fetchFileFromNodeAndPersist(id, liveNode);
if (result[0] != null) break;
} catch (Exception e) {
//it's OK for some nodes to fail
return result[0];
* get a list of nodes randomly shuffled
* * @lucene.internal
public ArrayList<String> shuffledNodes() {
Set<String> liveNodes = coreContainer.getZkController().getZkStateReader().getClusterState().getLiveNodes();
ArrayList<String> l = new ArrayList(liveNodes);
Collections.shuffle(l, BlobRepository.RANDOM);
return l;
public static class FileObjName {
final String sha256;
final String fname;
FileObjName(String name) {
int idx = name.indexOf('-');
if (idx == -1) {
sha256 = name;
fname = null;
} else {
sha256 = name.substring(0, idx);
fname = name.substring(idx + 1);
public boolean equals(Object obj) {
if (obj instanceof FileObjName) {
FileObjName that = (FileObjName) obj;
return Objects.equals(this.sha256, that.sha256) && Objects.equals(this.fname, that.fname);
return false;
public String name() {
return fname == null ? sha256 : sha256 + "-" + fname;
public String toString() {
return name();
private void persistToFile(ByteBuffer b, String id) throws IOException {
FileObjName fileObjName = new FileObjName(id);
String actual = sha256Digest(b);
if (!Objects.equals(actual, fileObjName.sha256)) {
throw new SolrException(SERVER_ERROR, "invalid id for blob actual: " + actual + " expected : " + fileObjName.sha256);
File file = new File(getFileStorePath().toFile(), id);
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(b.array(), 0, b.limit());
}"persisted a blob {} ", id);
IOUtils.fsync(file.toPath(), false);
boolean fetchFile(String id) {
File f = new File(getFileStorePath().toFile(), id);
if (f.exists()) return true;
return f.exists();
* Read a blob from the blobstore file system
public void readFile(String id, Consumer<InputStream> consumer) throws IOException {
if (!fetchFile(id)) throw new FileNotFoundException("No such file: " + id);
File f = new File(getFileStorePath().toFile(), id);
try (InputStream is = new FileInputStream(f)) {
* This distributes a blob to all nodes in the cluster
public void distributeFile(ByteBuffer buf, String id) throws IOException {
persistToFile(buf, id);
tmpFiles.put(id, buf);
List<String> nodes = coreContainer.getFileStore().shuffledNodes();
int i = 0;
try {
for (String node : nodes) {
String baseUrl = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
String url = baseUrl.replace("/solr", "/api") + "/node/filestore/package?id=" + id + "&fromNode=";
if (i < FETCHFROM_SRC) {
// this is to protect very large clusters from overwhelming a single node
// the first FETCHFROM_SRC nodes will be asked to fetch from this node.
// it's there in the memory now. So , it must be served fast
url += coreContainer.getZkController().getNodeName();
} else {
if (i == FETCHFROM_SRC) {
// This is just an optimization
// at this point a bunch of nodes are already downloading from me
// I'll wait for them to finish before asking other nodes to download from each other
try {
Thread.sleep(2 * 1000);
} catch (Exception e) {
// trying to avoid the thundering herd problem when there are a very large no:of nodes
// others should try to fetch it from any node where it is available. By now,
// almost FETCHFROM_SRC other nodes may have it
url += "*";
try {
//fire and forget
Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), url, null);
} catch (Exception e) {"Node: " + node +
" failed to respond for blob notification", e);
//ignore the exception
// some nodes may be down or not responding
} finally {
new Thread(() -> {
try {
// keep the jar in memory for 10 secs , so that
//every node can download it from memory without the file system
Thread.sleep(10 * 1000);
} catch (Exception e) {
//don't care
} finally {
private ByteBuffer fetchFileFromNodeAndPersist(String id, String fromNode) {"fetching a blob {} from {} ", id, fromNode);
ByteBuffer[] result = new ByteBuffer[1];
String url = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(fromNode);
if (url == null) throw new SolrException(BAD_REQUEST, "No such node");
coreContainer.getUpdateShardHandler().getUpdateExecutor().submit(() -> {
String fromUrl = url.replace("/solr", "/api") + "/node/filestore/package/" + id;
try {
HttpClient httpClient = coreContainer.getUpdateShardHandler().getDefaultHttpClient();
result[0] = Utils.executeGET(httpClient, fromUrl, Utils.newBytesConsumer((int) MAX_PKG_SIZE));
String actualSha256 = sha256Digest(result[0]);
FileObjName fileObjName = new FileObjName(id);
if (fileObjName.sha256.equals(actualSha256)) {
persistToFile(result[0], id);
} else {
result[0] = null;
log.error("expected sha256 : {} actual sha256: {} from blob downloaded from {} ", fileObjName.sha256, actualSha256, fromNode);
} catch (IOException e) {
log.error("Unable to fetch jar: {} from node: {}", id, fromNode);
return result[0];
@EndPoint(spec = "node.filestore.GET",
permission = PermissionNameProvider.Name.FILESTORE_READ)
public class FileStoreRead {
public void get(CallInfo info) {
SolrQueryRequest req = info.req;
SolrQueryResponse rsp = info.rsp;
String id = ((V2HttpCall) req.getHttpSolrCall()).getUrlParts().get(CommonParams.ID);
if (id == null) {
rsp.add("files",Collections.singletonMap(PACKAGE, fileList(req.getParams())));
} else {
if (!fetchFile(id)) {
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "No such blob");
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add(CommonParams.WT, FILE_STREAM);
req.setParams(SolrParams.wrapDefaults(solrParams, req.getParams()));
rsp.add(FILE_STREAM, (SolrCore.RawWriter) os -> {
ByteBuffer b = tmpFiles.get(id);
if (b != null) {
os.write(b.array(), b.arrayOffset(), b.limit());
} else {
File file = new File(getFileStorePath().toFile(), id);
try (FileInputStream is = new FileInputStream(file)) {, os);