blob: 4ede2174b1d275eefdd665f4fbbace6f89b31204 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.SimplePostTool;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.common.SolrException.ErrorCode.SERVICE_UNAVAILABLE;
/**
* The purpose of this class is to store the Jars loaded in memory and to keep only one copy of the Jar in a single node.
*/
public class BlobRepository {
@Deprecated
private static final long OLD_MAX_JAR_SIZE = Long.parseLong(
System.getProperty("runtme.lib.size", String.valueOf(5 * 1024 * 1024)));
private static final long MAX_JAR_SIZE = Long.parseLong(
System.getProperty("runtime.lib.size", String.valueOf(OLD_MAX_JAR_SIZE)));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final Random RANDOM;
static final Pattern BLOB_KEY_PATTERN_CHECKER = Pattern.compile(".*/\\d+");
static {
// We try to make things reproducible in the context of our tests by initializing the random instance
// based on the current seed
String seed = System.getProperty("tests.seed");
if (seed == null) {
RANDOM = new Random();
} else {
RANDOM = new Random(seed.hashCode());
}
}
private final CoreContainer coreContainer;
@SuppressWarnings({"rawtypes"})
private Map<String, BlobContent> blobs = createMap();
// for unit tests to override
@SuppressWarnings({"rawtypes"})
ConcurrentHashMap<String, BlobContent> createMap() {
return new ConcurrentHashMap<>();
}
public BlobRepository(CoreContainer coreContainer) {
this.coreContainer = coreContainer;
}
// I wanted to {@link SolrCore#loadDecodeAndCacheBlob(String, Decoder)} below but precommit complains
/**
* Returns the contents of a blob containing a ByteBuffer and increments a reference count. Please return the
* same object to decrease the refcount. This is normally used for storing jar files, and binary raw data.
* If you are caching Java Objects you want to use {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)}
*
* @param key it is a combination of blobname and version like blobName/version
* @return The reference of a blob
*/
public BlobContentRef<ByteBuffer> getBlobIncRef(String key) {
return getBlobIncRef(key, () -> addBlob(key));
}
/**
* Internal method that returns the contents of a blob and increments a reference count. Please return the same
* object to decrease the refcount. Only the decoded content will be cached when this method is used. Component
* authors attempting to share objects across cores should use
* {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)} which ensures that a proper close hook is also created.
*
* @param key it is a combination of blob name and version like blobName/version
* @param decoder a decoder that knows how to interpret the bytes from the blob
* @return The reference of a blob
*/
BlobContentRef<Object> getBlobIncRef(String key, Decoder<Object> decoder) {
return getBlobIncRef(key.concat(decoder.getName()), () -> addBlob(key, decoder));
}
@SuppressWarnings({"unchecked", "rawtypes"})
BlobContentRef getBlobIncRef(String key, Decoder decoder, String url, String sha512) {
StringBuilder keyBuilder = new StringBuilder(key);
if (decoder != null) keyBuilder.append(decoder.getName());
keyBuilder.append("/").append(sha512);
return getBlobIncRef(keyBuilder.toString(), () -> new BlobContent<>(key, fetchBlobAndVerify(key, url, sha512), decoder));
}
// do the actual work returning the appropriate type...
@SuppressWarnings({"unchecked"})
private <T> BlobContentRef<T> getBlobIncRef(String key, Callable<BlobContent<T>> blobCreator) {
BlobContent<T> aBlob;
if (this.coreContainer.isZooKeeperAware()) {
synchronized (blobs) {
aBlob = blobs.get(key);
if (aBlob == null) {
try {
aBlob = blobCreator.call();
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading failed: " + e.getMessage(), e);
}
}
}
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading is not supported in non-cloud mode");
// todo
}
BlobContentRef<T> ref = new BlobContentRef<>(aBlob);
synchronized (aBlob.references) {
aBlob.references.add(ref);
}
return ref;
}
// For use cases sharing raw bytes
private BlobContent<ByteBuffer> addBlob(String key) {
ByteBuffer b = fetchBlob(key);
BlobContent<ByteBuffer> aBlob = new BlobContent<>(key, b);
blobs.put(key, aBlob);
return aBlob;
}
// for use cases sharing java objects
private BlobContent<Object> addBlob(String key, Decoder<Object> decoder) {
ByteBuffer b = fetchBlob(key);
String keyPlusName = key + decoder.getName();
BlobContent<Object> aBlob = new BlobContent<>(keyPlusName, b, decoder);
blobs.put(keyPlusName, aBlob);
return aBlob;
}
static String INVALID_JAR_MSG = "Invalid jar from {0} , expected sha512 hash : {1} , actual : {2}";
private ByteBuffer fetchBlobAndVerify(String key, String url, String sha512) {
ByteBuffer byteBuffer = fetchFromUrl(key, url);
String computedDigest = sha512Digest(byteBuffer);
if (!computedDigest.equals(sha512)) {
throw new SolrException(SERVER_ERROR, StrUtils.formatString(INVALID_JAR_MSG, url, sha512, computedDigest));
}
return byteBuffer;
}
public static String sha512Digest(ByteBuffer byteBuffer) {
MessageDigest digest = null;
try {
digest = MessageDigest.getInstance("SHA-512");
} catch (NoSuchAlgorithmException e) {
//unlikely
throw new SolrException(SERVER_ERROR, e);
}
digest.update(byteBuffer);
return String.format(
Locale.ROOT,
"%0128x",
new BigInteger(1, digest.digest()));
}
/**
* Package local for unit tests only please do not use elsewhere
*/
ByteBuffer fetchBlob(String key) {
Replica replica = getSystemCollReplica();
String url = replica.getBaseUrl() + "/" + CollectionAdminParams.SYSTEM_COLL + "/blob/" + key + "?wt=filestream";
return fetchFromUrl(key, url);
}
ByteBuffer fetchFromUrl(String key, String url) {
HttpClient httpClient = coreContainer.getUpdateShardHandler().getDefaultHttpClient();
HttpGet httpGet = new HttpGet(url);
ByteBuffer b;
HttpResponse response = null;
HttpEntity entity = null;
try {
response = httpClient.execute(httpGet);
entity = response.getEntity();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version available: " + key);
}
try (InputStream is = entity.getContent()) {
b = SimplePostTool.inputStreamToByteArray(is, MAX_JAR_SIZE);
}
} catch (Exception e) {
if (e instanceof SolrException) {
throw (SolrException) e;
} else {
throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " + key, e);
}
} finally {
Utils.consumeFully(entity);
}
return b;
}
private Replica getSystemCollReplica() {
ZkStateReader zkStateReader = this.coreContainer.getZkController().getZkStateReader();
ClusterState cs = zkStateReader.getClusterState();
DocCollection coll = cs.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
if (coll == null)
throw new SolrException(SERVICE_UNAVAILABLE, CollectionAdminParams.SYSTEM_COLL + " collection not available");
ArrayList<Slice> slices = new ArrayList<>(coll.getActiveSlices());
if (slices.isEmpty())
throw new SolrException(SERVICE_UNAVAILABLE, "No active slices for " + CollectionAdminParams.SYSTEM_COLL + " collection");
Collections.shuffle(slices, RANDOM); //do load balancing
Replica replica = null;
for (Slice slice : slices) {
List<Replica> replicas = new ArrayList<>(slice.getReplicasMap().values());
Collections.shuffle(replicas, RANDOM);
for (Replica r : replicas) {
if (r.getState() == Replica.State.ACTIVE) {
if (zkStateReader.getClusterState().getLiveNodes().contains(r.get(ZkStateReader.NODE_NAME_PROP))) {
replica = r;
break;
} else {
if (log.isInfoEnabled()) {
log.info("replica {} says it is active but not a member of live nodes", r.get(ZkStateReader.NODE_NAME_PROP));
}
}
}
}
}
if (replica == null) {
throw new SolrException(SERVICE_UNAVAILABLE, "No active replica available for " + CollectionAdminParams.SYSTEM_COLL + " collection");
}
return replica;
}
/**
* This is to decrement a ref count
*
* @param ref The reference that is already there. Doing multiple calls with same ref will not matter
*/
public void decrementBlobRefCount(@SuppressWarnings({"rawtypes"})BlobContentRef ref) {
if (ref == null) return;
synchronized (ref.blob.references) {
if (!ref.blob.references.remove(ref)) {
log.error("Multiple releases for the same reference");
}
if (ref.blob.references.isEmpty()) {
blobs.remove(ref.blob.key);
}
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public static class BlobContent<T> {
public final String key;
private final T content; // holds byte buffer or cached object, holding both is a waste of memory
// ref counting mechanism
private final Set<BlobContentRef> references = new HashSet<>();
public BlobContent(String key, ByteBuffer buffer, Decoder<T> decoder) {
this.key = key;
this.content = decoder == null ? (T) buffer : decoder.decode(new ByteBufferInputStream(buffer));
}
@SuppressWarnings("unchecked")
public BlobContent(String key, ByteBuffer buffer) {
this.key = key;
this.content = (T) buffer;
}
/**
* Get the cached object.
*
* @return the object representing the content that is cached.
*/
public T get() {
return this.content;
}
}
public interface Decoder<T> {
/**
* A name by which to distinguish this decoding. This only needs to be implemented if you want to support
* decoding the same blob content with more than one decoder.
*
* @return The name of the decoding, defaults to empty string.
*/
default String getName() {
return "";
}
/**
* A routine that knows how to convert the stream of bytes from the blob into a Java object.
*
* @param inputStream the bytes from a blob
* @return A Java object of the specified type.
*/
T decode(InputStream inputStream);
}
public static class BlobContentRef<T> {
public final BlobContent<T> blob;
private BlobContentRef(BlobContent<T> blob) {
this.blob = blob;
}
}
}