| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.janusgraph.diskstorage.solr; |
| |
| import static org.janusgraph.diskstorage.solr.SolrIndex.*; |
| import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE; |
| |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.io.UncheckedIOException; |
| import java.lang.reflect.Constructor; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.time.Instant; |
| import java.util.AbstractMap.SimpleEntry; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Spliterator; |
| import java.util.Spliterators; |
| import java.util.TimeZone; |
| import java.util.UUID; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import java.util.stream.StreamSupport; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpEntityEnclosingRequest; |
| import org.apache.http.HttpException; |
| import org.apache.http.HttpRequest; |
| import org.apache.http.HttpRequestInterceptor; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.entity.BufferedHttpEntity; |
| import org.apache.http.impl.auth.KerberosScheme; |
| import org.apache.http.protocol.HttpContext; |
| import org.apache.lucene.analysis.CachingTokenFilter; |
| import org.apache.lucene.analysis.Tokenizer; |
| import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrQuery; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.impl.HttpClientUtil; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient; |
| import org.apache.solr.client.solrj.impl.Krb5HttpClientBuilder; |
| import org.apache.solr.client.solrj.impl.LBHttpSolrClient; |
| import org.apache.solr.client.solrj.impl.PreemptiveAuth; |
| import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.client.solrj.response.CollectionAdminResponse; |
| import org.apache.solr.client.solrj.response.QueryResponse; |
| import org.apache.solr.client.solrj.util.ClientUtils; |
| import org.apache.solr.common.SolrDocument; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.zookeeper.KeeperException; |
| import org.janusgraph.core.Cardinality; |
| import org.janusgraph.core.JanusGraphElement; |
| import org.janusgraph.core.attribute.Cmp; |
| import org.janusgraph.core.attribute.Geo; |
| import org.janusgraph.core.attribute.Geoshape; |
| import org.janusgraph.core.attribute.Text; |
| import org.janusgraph.core.schema.Mapping; |
| import org.janusgraph.core.schema.Parameter; |
| import org.janusgraph.diskstorage.BackendException; |
| import org.janusgraph.diskstorage.BaseTransaction; |
| import org.janusgraph.diskstorage.BaseTransactionConfig; |
| import org.janusgraph.diskstorage.BaseTransactionConfigurable; |
| import org.janusgraph.diskstorage.PermanentBackendException; |
| import org.janusgraph.diskstorage.TemporaryBackendException; |
| import org.janusgraph.diskstorage.configuration.ConfigOption; |
| import org.janusgraph.diskstorage.configuration.Configuration; |
| import org.janusgraph.diskstorage.indexing.IndexEntry; |
| import org.janusgraph.diskstorage.indexing.IndexFeatures; |
| import org.janusgraph.diskstorage.indexing.IndexMutation; |
| import org.janusgraph.diskstorage.indexing.IndexProvider; |
| import org.janusgraph.diskstorage.indexing.IndexQuery; |
| import org.janusgraph.diskstorage.indexing.KeyInformation; |
| import org.janusgraph.diskstorage.indexing.RawQuery; |
| import org.janusgraph.diskstorage.solr.transform.GeoToWktConverter; |
| import org.janusgraph.diskstorage.util.DefaultTransaction; |
| import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions; |
| import org.janusgraph.graphdb.database.serialize.AttributeUtils; |
| import org.janusgraph.graphdb.internal.Order; |
| import org.janusgraph.graphdb.query.JanusGraphPredicate; |
| import org.janusgraph.graphdb.query.condition.And; |
| import org.janusgraph.graphdb.query.condition.Condition; |
| import org.janusgraph.graphdb.query.condition.Not; |
| import org.janusgraph.graphdb.query.condition.Or; |
| import org.janusgraph.graphdb.query.condition.PredicateCondition; |
| import org.janusgraph.graphdb.types.ParameterType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * NOTE: Copied from JanusGraph for supporting Kerberos and adding support for multiple zookeeper clients. Do not change |
| * This is a copy of SolrIndex.java from org.janusgraph.diskstorage.solr |
| */ |
| @PreInitializeConfigOptions |
| public class Solr6Index implements IndexProvider { |
| |
| private static final Logger logger = LoggerFactory.getLogger(Solr6Index.class); |
| |
| private static final String DEFAULT_ID_FIELD = "id"; |
| private static final char CHROOT_START_CHAR = '/'; |
| |
| private static Solr6Index instance = null; |
| public static final ConfigOption<Boolean> CREATE_SOLR_CLIENT_PER_REQUEST = new ConfigOption(SOLR_NS, "create-client-per-request", "when false, allows the sharing of solr client across other components.", org.janusgraph.diskstorage.configuration.ConfigOption.Type.LOCAL, false); |
| |
| public enum Mode { |
| HTTP, CLOUD; |
| |
| public static Mode parse(String mode) { |
| for (final Mode m : Mode.values()) { |
| if (m.toString().equalsIgnoreCase(mode)) return m; |
| } |
| throw new IllegalArgumentException("Unrecognized mode: "+mode); |
| } |
| |
| } |
| |
| public static final ConfigOption<String[]> ZOOKEEPER_URLS = new ConfigOption<>(SOLR_NS,"zookeeper-urls", |
| "URL of the Zookeeper instance coordinating the SolrCloud cluster", |
| ConfigOption.Type.MASKABLE, new String[]{"localhost:2181"}); |
| |
| private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder() |
| .supportsDocumentTTL() |
| .setDefaultStringMapping(Mapping.TEXT) |
| .supportedStringMappings(Mapping.TEXT, Mapping.STRING) |
| .supportsCardinality(Cardinality.SINGLE) |
| .supportsCardinality(Cardinality.LIST) |
| .supportsCardinality(Cardinality.SET) |
| .supportsCustomAnalyzer() |
| .supportsGeoContains() |
| .build(); |
| |
| private static final Map<Geo, String> SPATIAL_PREDICATES = spatialPredicates(); |
| private static boolean createSolrClientPerRequest; |
| |
| private final SolrClient solrClient; |
| private final Configuration configuration; |
| private final Mode mode; |
| private final boolean dynFields; |
| private final Map<String, String> keyFieldIds; |
| private final String ttlField; |
| private final int batchSize; |
| private final boolean waitSearcher; |
| private final boolean kerberosEnabled; |
| |
| public Solr6Index(final Configuration config) throws BackendException { |
| // Add Kerberos-enabled SolrHttpClientBuilder |
| HttpClientUtil.setHttpClientBuilder(new Krb5HttpClientBuilder().getBuilder()); |
| |
| Preconditions.checkArgument(config!=null); |
| configuration = config; |
| mode = Mode.parse(config.get(SOLR_MODE)); |
| kerberosEnabled = config.get(KERBEROS_ENABLED); |
| dynFields = config.get(DYNAMIC_FIELDS); |
| keyFieldIds = parseKeyFieldsForCollections(config); |
| batchSize = config.get(INDEX_MAX_RESULT_SET_SIZE); |
| ttlField = config.get(TTL_FIELD); |
| waitSearcher = config.get(WAIT_SEARCHER); |
| |
| if (kerberosEnabled) { |
| logger.debug("Kerberos is enabled. Configuring SOLR for Kerberos."); |
| configureSolrClientsForKerberos(); |
| } else { |
| logger.debug("Kerberos is NOT enabled."); |
| logger.debug("KERBEROS_ENABLED name is " + KERBEROS_ENABLED.getName() + " and it is" + (KERBEROS_ENABLED.isOption() ? " " : " not") + " an option."); |
| logger.debug("KERBEROS_ENABLED type is " + KERBEROS_ENABLED.getType().name()); |
| } |
| |
| solrClient = createSolrClient(); |
| createSolrClientPerRequest = config.get(CREATE_SOLR_CLIENT_PER_REQUEST); |
| if(createSolrClientPerRequest) { |
| logger.info("A new Solr Client will be created for direct interation with SOLR."); |
| } else { |
| logger.info("Solr Client will be shared for direct interation with SOLR."); |
| } |
| Solr6Index.instance = this; |
| } |
| |
| public static Mode getSolrMode() { |
| Solr6Index solr6Index = Solr6Index.instance; |
| Mode ret = (solr6Index != null) ? Mode.parse(solr6Index.configuration.get(SOLR_MODE)) : null; |
| |
| if (ret == null) { |
| logger.warn("SolrMode is not set. Assuming {}", Mode.CLOUD); |
| |
| ret = Mode.CLOUD; |
| } |
| |
| return ret; |
| } |
| |
| public static SolrClient getSolrClient() { |
| if (Solr6Index.instance != null) { |
| if (createSolrClientPerRequest) { |
| logger.debug("Creating a new Solr Client."); |
| return Solr6Index.instance.createSolrClient(); |
| } else { |
| logger.debug("Returning the solr client owned by Solr6Index."); |
| return Solr6Index.instance.solrClient; |
| } |
| } else { |
| logger.debug(" No Solr6Index available. Will return null"); |
| return null; |
| } |
| } |
| |
| public static void releaseSolrClient(SolrClient solrClient) { |
| if(createSolrClientPerRequest) { |
| if (solrClient != null) { |
| try { |
| solrClient.close(); |
| |
| if(logger.isDebugEnabled()) { |
| logger.debug("Closed the solr client successfully."); |
| } |
| } catch (IOException excp) { |
| logger.warn("Failed to close SolrClient.", excp); |
| } |
| } |
| } else { |
| if(logger.isDebugEnabled()) { |
| logger.debug("Ignoring the closing of solr client as it is owned by Solr6Index."); |
| } |
| } |
| } |
| |
| private SolrClient createSolrClient() { |
| if(logger.isDebugEnabled()) { |
| logger.debug("HttpClientBuilder = {}", HttpClientUtil.getHttpClientBuilder(), new Exception()); |
| } |
| final ModifiableSolrParams clientParams = new ModifiableSolrParams(); |
| SolrClient solrClient = null; |
| |
| Mode mode = Mode.parse(configuration.get(SOLR_MODE)); |
| switch (mode) { |
| case CLOUD: |
| /* ATLAS-2920: Update JanusGraph Solr clients to use all zookeeper entries – start */ |
| List<String> zkHosts = new ArrayList<>(); |
| String chroot = null; |
| String[] zkUrls = configuration.get(ZOOKEEPER_URLS); |
| |
| if (zkUrls != null) { |
| for (int i = zkUrls.length - 1; i >= 0; i--) { |
| String zkUrl = zkUrls[i]; |
| int idxChroot = zkUrl.indexOf(CHROOT_START_CHAR); |
| |
| if (idxChroot != -1) { |
| if (chroot == null) { |
| chroot = zkUrl.substring(idxChroot); |
| } |
| |
| zkUrl = zkUrl.substring(0, idxChroot); |
| } |
| |
| zkHosts.add(zkUrl); |
| } |
| } |
| /* ATLAS-2920: - end */ |
| |
| final CloudSolrClient cloudServer = new CloudSolrClient.Builder().withZkHost(zkHosts).withZkChroot(chroot) |
| .withLBHttpSolrClientBuilder( |
| new LBHttpSolrClient.Builder() |
| .withHttpSolrClientBuilder(new HttpSolrClient.Builder().withInvariantParams(clientParams)) |
| .withBaseSolrUrls(configuration.get(HTTP_URLS)) |
| ) |
| .sendUpdatesOnlyToShardLeaders() |
| .build(); |
| cloudServer.connect(); |
| solrClient = cloudServer; |
| logger.info("Created solr client using Cloud based configuration."); |
| break; |
| case HTTP: |
| clientParams.add(HttpClientUtil.PROP_ALLOW_COMPRESSION, configuration.get(HTTP_ALLOW_COMPRESSION).toString()); |
| clientParams.add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, configuration.get(HTTP_CONNECTION_TIMEOUT).toString()); |
| clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, configuration.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); |
| clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS, configuration.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); |
| final HttpClient client = HttpClientUtil.createClient(clientParams); |
| solrClient = new LBHttpSolrClient.Builder() |
| .withHttpClient(client) |
| .withBaseSolrUrls(configuration.get(HTTP_URLS)) |
| .build(); |
| logger.info("Created solr client using HTTP based configuration."); |
| break; |
| default: |
| throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode); |
| } |
| return solrClient; |
| } |
| |
| private void configureSolrClientsForKerberos() throws PermanentBackendException { |
| String kerberosConfig = System.getProperty("java.security.auth.login.config"); |
| if(kerberosConfig == null) { |
| throw new PermanentBackendException("Unable to configure kerberos for solr client. System property 'java.security.auth.login.config' is not set."); |
| } |
| logger.debug("Using kerberos configuration file located at '{}'.", kerberosConfig); |
| try(Krb5HttpClientBuilder krbBuild = new Krb5HttpClientBuilder()) { |
| |
| SolrHttpClientBuilder kb = krbBuild.getBuilder(); |
| HttpClientUtil.setHttpClientBuilder(kb); |
| HttpRequestInterceptor bufferedEntityInterceptor = new HttpRequestInterceptor() { |
| @Override |
| public void process(HttpRequest request, HttpContext context) throws HttpException, IOException { |
| if(request instanceof HttpEntityEnclosingRequest) { |
| HttpEntityEnclosingRequest enclosingRequest = ((HttpEntityEnclosingRequest) request); |
| HttpEntity requestEntity = enclosingRequest.getEntity(); |
| enclosingRequest.setEntity(new BufferedHttpEntity(requestEntity)); |
| } |
| } |
| }; |
| HttpClientUtil.addRequestInterceptor(bufferedEntityInterceptor); |
| |
| HttpRequestInterceptor preemptiveAuth = new PreemptiveAuth(new KerberosScheme()); |
| HttpClientUtil.addRequestInterceptor(preemptiveAuth); |
| } |
| } |
| |
| private Map<String, String> parseKeyFieldsForCollections(Configuration config) throws BackendException { |
| final Map<String, String> keyFieldNames = new HashMap<>(); |
| final String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES) ? config.get(KEY_FIELD_NAMES) : new String[0]; |
| for (final String collectionFieldStatement : collectionFieldStatements) { |
| final String[] parts = collectionFieldStatement.trim().split("="); |
| if (parts.length != 2) { |
| throw new PermanentBackendException( |
| "Unable to parse the collection name / key field name pair. It should be of the format collection=field"); |
| } |
| final String collectionName = parts[0]; |
| final String keyFieldName = parts[1]; |
| keyFieldNames.put(collectionName, keyFieldName); |
| } |
| return keyFieldNames; |
| } |
| |
| private String getKeyFieldId(String collection) { |
| String field = keyFieldIds.get(collection); |
| if (field==null) field = DEFAULT_ID_FIELD; |
| return field; |
| } |
| |
| /** |
| * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to |
| * support searching. This means that you will need to modify the solr schema with the |
| * appropriate field definitions in order to work properly. If you have a running instance |
| * of Solr and you modify its schema with new fields, don't forget to re-index! |
| * @param store Index store |
| * @param key New key to register |
| * @param information data type to register for the key |
| * @param tx enclosing transaction |
| * @throws BackendException in case an exception is thrown when |
| * creating a collection. |
| */ |
| @SuppressWarnings("unchecked") |
| @Override |
| public void register(String store, String key, KeyInformation information, BaseTransaction tx) |
| throws BackendException { |
| if (mode== Mode.CLOUD) { |
| final CloudSolrClient client = (CloudSolrClient) solrClient; |
| try { |
| createCollectionIfNotExists(client, configuration, store); |
| } catch (final IOException | SolrServerException | InterruptedException | KeeperException e) { |
| throw new PermanentBackendException(e); |
| } |
| } |
| //Since all data types must be defined in the schema.xml, pre-registering a type does not work |
| //But we check Analyse feature |
| String analyzer = ParameterType.STRING_ANALYZER.findParameter(information.getParameters(), null); |
| if (analyzer != null) { |
| //If the key have a tokenizer, we try to get it by reflection |
| // some referred classes might not be able to be found via SystemClassLoader |
| // since they might be associated with other classloader, in this situation |
| // ClassNotFound exception will be thrown. instead of using SystemClassLoader |
| // for all classes, we find its classloader first and then load the class, please |
| // call - instantiateUsingClassLoader() |
| try { |
| ((Constructor<Tokenizer>) ClassLoader.getSystemClassLoader().loadClass(analyzer) |
| .getConstructor()).newInstance(); |
| } catch (final ReflectiveOperationException e) { |
| throw new PermanentBackendException(e.getMessage(),e); |
| } |
| } |
| analyzer = ParameterType.TEXT_ANALYZER.findParameter(information.getParameters(), null); |
| if (analyzer != null) { |
| //If the key have a tokenizer, we try to get it by reflection |
| try { |
| ((Constructor<Tokenizer>) ClassLoader.getSystemClassLoader().loadClass(analyzer) |
| .getConstructor()).newInstance(); |
| } catch (final ReflectiveOperationException e) { |
| throw new PermanentBackendException(e.getMessage(),e); |
| } |
| } |
| } |
| |
| private void instantiateUsingClassLoader(String analyzer) throws PermanentBackendException { |
| if (analyzer == null) return; |
| try { |
| Class analyzerClass = Class.forName(analyzer); |
| ClassLoader cl = analyzerClass.getClassLoader(); |
| ((Constructor<Tokenizer>) cl.loadClass(analyzer).getConstructor()).newInstance(); |
| } catch (final ReflectiveOperationException e) { |
| throw new PermanentBackendException(e.getMessage(),e); |
| } |
| } |
| |
| @Override |
| public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever information, |
| BaseTransaction tx) throws BackendException { |
| logger.debug("Mutating SOLR"); |
| try { |
| for (final Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) { |
| final String collectionName = stores.getKey(); |
| final String keyIdField = getKeyFieldId(collectionName); |
| |
| final List<String> deleteIds = new ArrayList<>(); |
| final Collection<SolrInputDocument> changes = new ArrayList<>(); |
| |
| for (final Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) { |
| final String docId = entry.getKey(); |
| final IndexMutation mutation = entry.getValue(); |
| Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); |
| Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); |
| Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); |
| |
| //Handle any deletions |
| if (mutation.hasDeletions()) { |
| if (mutation.isDeleted()) { |
| logger.trace("Deleting entire document {}", docId); |
| deleteIds.add(docId); |
| } else { |
| final List<IndexEntry> fieldDeletions = new ArrayList<>(mutation.getDeletions()); |
| if (mutation.hasAdditions()) { |
| for (final IndexEntry indexEntry : mutation.getAdditions()) { |
| fieldDeletions.remove(indexEntry); |
| } |
| } |
| handleRemovalsFromIndex(collectionName, keyIdField, docId, fieldDeletions, information); |
| } |
| } |
| |
| if (mutation.hasAdditions()) { |
| final int ttl = mutation.determineTTL(); |
| |
| final SolrInputDocument doc = new SolrInputDocument(); |
| doc.setField(keyIdField, docId); |
| |
| final boolean isNewDoc = mutation.isNew(); |
| |
| if (isNewDoc) |
| logger.trace("Adding new document {}", docId); |
| final Map<String, Object> adds = collectFieldValues(mutation.getAdditions(), collectionName, |
| information); |
| // If cardinality is not single then we should use the "add" operation to update |
| // the index so we don't overwrite existing values. |
| adds.keySet().forEach(v-> { |
| final KeyInformation keyInformation = information.get(collectionName, v); |
| final String solrOp = keyInformation.getCardinality() == Cardinality.SINGLE ? "set" : "add"; |
| doc.setField(v, isNewDoc ? adds.get(v) : |
| new HashMap<String, Object>(1) {{put(solrOp, adds.get(v));}} |
| ); |
| }); |
| if (ttl>0) { |
| Preconditions.checkArgument(isNewDoc, |
| "Solr only supports TTL on new documents [%s]", docId); |
| doc.setField(ttlField, String.format("+%dSECONDS", ttl)); |
| } |
| changes.add(doc); |
| } |
| } |
| |
| commitDeletes(collectionName, deleteIds); |
| commitChanges(collectionName, changes); |
| } |
| } catch (final IllegalArgumentException e) { |
| throw new PermanentBackendException("Unable to complete query on Solr.", e); |
| } catch (final Exception e) { |
| throw storageException(e); |
| } |
| } |
| |
| private void handleRemovalsFromIndex(String collectionName, String keyIdField, String docId, |
| List<IndexEntry> fieldDeletions, KeyInformation.IndexRetriever information) |
| throws SolrServerException, IOException, BackendException { |
| final Map<String, String> fieldDeletes = new HashMap<>(1); |
| fieldDeletes.put("set", null); |
| final SolrInputDocument doc = new SolrInputDocument(); |
| doc.addField(keyIdField, docId); |
| for(final IndexEntry v: fieldDeletions) { |
| final KeyInformation keyInformation = information.get(collectionName, v.field); |
| // If the cardinality is a Set or List, we just need to remove the individual value |
| // received in the mutation and not set the field to null, but we still consolidate the values |
| // in the event of multiple removals in one mutation. |
| final Map<String, Object> deletes = collectFieldValues(fieldDeletions, collectionName, information); |
| deletes.keySet().forEach(vertex -> { |
| final Map<String, Object> remove; |
| if (keyInformation.getCardinality() == Cardinality.SINGLE) { |
| remove = (Map) fieldDeletes; |
| } else { |
| remove = new HashMap<>(1); |
| remove.put("remove", deletes.get(vertex)); |
| } |
| doc.setField(vertex, remove); |
| }); |
| } |
| |
| final UpdateRequest singleDocument = newUpdateRequest(); |
| singleDocument.add(doc); |
| solrClient.request(singleDocument, collectionName); |
| |
| } |
| |
| private Object convertValue(Object value) throws BackendException { |
| if (value instanceof Geoshape) { |
| return GeoToWktConverter.convertToWktString((Geoshape) value); |
| } |
| if (value instanceof UUID) { |
| return value.toString(); |
| } |
| if(value instanceof Instant) { |
| if(Math.floorMod(((Instant) value).getNano(), 1000000) != 0) { |
| throw new IllegalArgumentException("Solr indexes do not support nanoseconds"); |
| } |
| return new Date(((Instant) value).toEpochMilli()); |
| } |
| return value; |
| } |
| |
| @Override |
| public void restore(Map<String, Map<String, List<IndexEntry>>> documents, |
| KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException { |
| try { |
| for (final Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) { |
| final String collectionName = stores.getKey(); |
| |
| final List<String> deleteIds = new ArrayList<>(); |
| final List<SolrInputDocument> newDocuments = new ArrayList<>(); |
| |
| for (final Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) { |
| final String docID = entry.getKey(); |
| final List<IndexEntry> content = entry.getValue(); |
| |
| if (content == null || content.isEmpty()) { |
| if (logger.isTraceEnabled()) |
| logger.trace("Deleting document [{}]", docID); |
| |
| deleteIds.add(docID); |
| continue; |
| } |
| final SolrInputDocument doc = new SolrInputDocument(); |
| doc.setField(getKeyFieldId(collectionName), docID); |
| final Map<String, Object> adds = collectFieldValues(content, collectionName, information); |
| adds.forEach(doc::setField); |
| newDocuments.add(doc); |
| } |
| commitDeletes(collectionName, deleteIds); |
| commitChanges(collectionName, newDocuments); |
| } |
| } catch (final Exception e) { |
| throw new TemporaryBackendException("Could not restore Solr index", e); |
| } |
| } |
| |
| // This method will create a map of field ids to values. In the case of multiValued fields, |
| // it will consolidate all the values into one List or Set so it can be updated with a single Solr operation |
| private Map<String, Object> collectFieldValues(List<IndexEntry> content, String collectionName, |
| KeyInformation.IndexRetriever information) throws BackendException { |
| final Map<String, Object> docs = new HashMap<>(); |
| for (final IndexEntry addition: content) { |
| final KeyInformation keyInformation = information.get(collectionName, addition.field); |
| switch (keyInformation.getCardinality()) { |
| case SINGLE: |
| docs.put(addition.field, convertValue(addition.value)); |
| break; |
| case SET: |
| if (!docs.containsKey(addition.field)) { |
| docs.put(addition.field, new HashSet<>()); |
| } |
| ((Set<Object>) docs.get(addition.field)).add(convertValue(addition.value)); |
| break; |
| case LIST: |
| if (!docs.containsKey(addition.field)) { |
| docs.put(addition.field, new ArrayList<>()); |
| } |
| ((List<Object>) docs.get(addition.field)).add(convertValue(addition.value)); |
| break; |
| } |
| } |
| return docs; |
| } |
| |
| private void commitChanges(String collectionName, |
| Collection<SolrInputDocument> documents) throws SolrServerException, IOException { |
| if (documents.size() == 0) return; |
| |
| try { |
| solrClient.request(newUpdateRequest().add(documents), collectionName); |
| } catch (final HttpSolrClient.RemoteSolrException rse) { |
| logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse); |
| logger.error("Details in failed document batch: "); |
| for (final SolrInputDocument d : documents) { |
| final Collection<String> fieldNames = d.getFieldNames(); |
| for (final String name : fieldNames) { |
| logger.error(name + ":" + d.getFieldValue(name)); |
| } |
| } |
| |
| throw rse; |
| } |
| } |
| |
| private void commitDeletes(String collectionName, List<String> deleteIds) throws SolrServerException, IOException { |
| if (deleteIds.size() == 0) return; |
| solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName); |
| } |
| |
| @Override |
| public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever information, |
| BaseTransaction tx) throws BackendException { |
| final String collection = query.getStore(); |
| final String keyIdField = getKeyFieldId(collection); |
| final SolrQuery solrQuery = new SolrQuery("*:*"); |
| solrQuery.set(CommonParams.FL, keyIdField); |
| final String queryFilter = buildQueryFilter(query.getCondition(), information.get(collection)); |
| solrQuery.addFilterQuery(queryFilter); |
| if (!query.getOrder().isEmpty()) { |
| addOrderToQuery(solrQuery, query.getOrder()); |
| } |
| solrQuery.setStart(0); |
| if (query.hasLimit()) { |
| solrQuery.setRows(Math.min(query.getLimit(), batchSize)); |
| } else { |
| solrQuery.setRows(batchSize); |
| } |
| return executeQuery(query.hasLimit() ? query.getLimit() : null, 0, collection, solrQuery, |
| doc -> doc.getFieldValue(keyIdField).toString()); |
| } |
| |
| @Override |
| public Long queryCount(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException { |
| try { |
| String collection = query.getStore(); |
| String keyIdField = this.getKeyFieldId(collection); |
| SolrQuery solrQuery = new SolrQuery("*:*"); |
| solrQuery.set("fl", new String[]{keyIdField}); |
| String queryFilter = this.buildQueryFilter(query.getCondition(), information.get(collection)); |
| solrQuery.addFilterQuery(new String[]{queryFilter}); |
| QueryResponse response = this.solrClient.query(collection, solrQuery); |
| logger.debug("Executed query [{}] in {} ms", query, response.getElapsedTime()); |
| return response.getResults().getNumFound(); |
| } catch (IOException ex) { |
| logger.error("Query did not complete : ", ex); |
| throw new PermanentBackendException(ex); |
| } catch (SolrServerException ex) { |
| logger.error("Unable to query Solr index.", ex); |
| throw new PermanentBackendException(ex); |
| } |
| } |
| |
| private void addOrderToQuery(SolrQuery solrQuery, List<IndexQuery.OrderEntry> orders) { |
| for (final IndexQuery.OrderEntry order1 : orders) { |
| final String item = order1.getKey(); |
| final SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc; |
| solrQuery.addSort(new SolrQuery.SortClause(item, order)); |
| } |
| } |
| |
| private <E> Stream<E> executeQuery(Integer limit, int offset, String collection, SolrQuery solrQuery, |
| Function<SolrDocument, E> function) throws PermanentBackendException { |
| try { |
| final SolrResultIterator<E> resultIterator = new SolrResultIterator<>(solrClient, limit, offset, |
| solrQuery.getRows(), collection, solrQuery, function); |
| return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), |
| false); |
| } catch (final IOException | UncheckedIOException e) { |
| logger.error("Query did not complete : ", e); |
| throw new PermanentBackendException(e); |
| } catch (final SolrServerException | UncheckedSolrException e) { |
| logger.error("Unable to query Solr index.", e); |
| throw new PermanentBackendException(e); |
| } |
| } |
| |
| |
| private SolrQuery runCommonQuery(RawQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx, |
| String collection, String keyIdField) throws BackendException { |
| final SolrQuery solrQuery = new SolrQuery(query.getQuery()) |
| .addField(keyIdField) |
| .setIncludeScore(true) |
| .setStart(query.getOffset()); |
| if (query.hasLimit()) { |
| solrQuery.setRows(Math.min(query.getLimit(), batchSize)); |
| } else { |
| solrQuery.setRows(batchSize); |
| } |
| if (!query.getOrders().isEmpty()) { |
| addOrderToQuery(solrQuery, query.getOrders()); |
| } |
| |
| for(final Parameter parameter: query.getParameters()) { |
| if (parameter.value() instanceof String[]) { |
| solrQuery.setParam(parameter.key(), (String[]) parameter.value()); |
| } else if (parameter.value() instanceof String) { |
| solrQuery.setParam(parameter.key(), (String) parameter.value()); |
| } |
| } |
| return solrQuery; |
| } |
| |
| @Override |
| public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever information, |
| BaseTransaction tx) throws BackendException { |
| final String collection = query.getStore(); |
| final String keyIdField = getKeyFieldId(collection); |
| return executeQuery(query.hasLimit() ? query.getLimit() : null, query.getOffset(), collection, |
| runCommonQuery(query, information, tx, collection, keyIdField), doc -> { |
| final double score = Double.parseDouble(doc.getFieldValue("score").toString()); |
| return new RawQuery.Result<>(doc.getFieldValue(keyIdField).toString(), score); |
| }); |
| } |
| |
| @Override |
| public Long totals(RawQuery query, KeyInformation.IndexRetriever information, |
| BaseTransaction tx) throws BackendException { |
| try { |
| final String collection = query.getStore(); |
| final String keyIdField = getKeyFieldId(collection); |
| final QueryResponse response = solrClient.query(collection, runCommonQuery(query, information, tx, |
| collection, keyIdField)); |
| logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); |
| return response.getResults().getNumFound(); |
| } catch (final IOException e) { |
| logger.error("Query did not complete : ", e); |
| throw new PermanentBackendException(e); |
| } catch (final SolrServerException e) { |
| logger.error("Unable to query Solr index.", e); |
| throw new PermanentBackendException(e); |
| } |
| } |
| |
| private static String escapeValue(Object value) { |
| return ClientUtils.escapeQueryChars(value.toString()); |
| } |
| |
| public String buildQueryFilter(Condition<JanusGraphElement> condition, KeyInformation.StoreRetriever information) { |
| if (condition instanceof PredicateCondition) { |
| final PredicateCondition<String, JanusGraphElement> atom |
| = (PredicateCondition<String, JanusGraphElement>) condition; |
| final Object value = atom.getValue(); |
| final String key = atom.getKey(); |
| final JanusGraphPredicate predicate = atom.getPredicate(); |
| |
| if (value == null && predicate == Cmp.NOT_EQUAL) { |
| return key + ":*"; |
| } else if (value instanceof Number) { |
| final String queryValue = escapeValue(value); |
| Preconditions.checkArgument(predicate instanceof Cmp, |
| "Relation not supported on numeric types: %s", predicate); |
| final Cmp numRel = (Cmp) predicate; |
| switch (numRel) { |
| case EQUAL: |
| return (key + ":" + queryValue); |
| case NOT_EQUAL: |
| return ("-" + key + ":" + queryValue); |
| case LESS_THAN: |
| //use right curly to mean up to but not including value |
| return (key + ":[* TO " + queryValue + "}"); |
| case LESS_THAN_EQUAL: |
| return (key + ":[* TO " + queryValue + "]"); |
| case GREATER_THAN: |
| //use left curly to mean greater than but not including value |
| return (key + ":{" + queryValue + " TO *]"); |
| case GREATER_THAN_EQUAL: |
| return (key + ":[" + queryValue + " TO *]"); |
| default: throw new IllegalArgumentException("Unexpected relation: " + numRel); |
| } |
| } else if (value instanceof String) { |
| final Mapping map = getStringMapping(information.get(key)); |
| assert map==Mapping.TEXT || map==Mapping.STRING; |
| |
| if (map==Mapping.TEXT && !(Text.HAS_CONTAINS.contains(predicate) || predicate instanceof Cmp)) |
| throw new IllegalArgumentException("Text mapped string values only support CONTAINS and Compare queries and not: " + predicate); |
| if (map==Mapping.STRING && Text.HAS_CONTAINS.contains(predicate)) |
| throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + predicate); |
| |
| //Special case |
| if (predicate == Text.CONTAINS) { |
| return tokenize(information, value, key, predicate, |
| ParameterType.TEXT_ANALYZER.findParameter(information.get(key).getParameters(), null)); |
| } else if (predicate == Text.PREFIX || predicate == Text.CONTAINS_PREFIX) { |
| return (key + ":" + escapeValue(value) + "*"); |
| } else if (predicate == Text.REGEX || predicate == Text.CONTAINS_REGEX) { |
| return (key + ":/" + value + "/"); |
| } else if (predicate == Cmp.EQUAL || predicate == Cmp.NOT_EQUAL) { |
| final String tokenizer = |
| ParameterType.STRING_ANALYZER.findParameter(information.get(key).getParameters(), null); |
| if (tokenizer != null) { |
| return tokenize(information, value, key, predicate, tokenizer); |
| } else if (predicate == Cmp.EQUAL) { |
| return (key + ":\"" + escapeValue(value) + "\""); |
| } else { // Cmp.NOT_EQUAL case |
| return ("-" + key + ":\"" + escapeValue(value) + "\""); |
| } |
| } else if (predicate == Text.FUZZY || predicate == Text.CONTAINS_FUZZY) { |
| return (key + ":"+escapeValue(value)+"~"+Text.getMaxEditDistance(value.toString())); |
| } else if (predicate == Cmp.LESS_THAN) { |
| return (key + ":[* TO \"" + escapeValue(value) + "\"}"); |
| } else if (predicate == Cmp.LESS_THAN_EQUAL) { |
| return (key + ":[* TO \"" + escapeValue(value) + "\"]"); |
| } else if (predicate == Cmp.GREATER_THAN) { |
| return (key + ":{\"" + escapeValue(value) + "\" TO *]"); |
| } else if (predicate == Cmp.GREATER_THAN_EQUAL) { |
| return (key + ":[\"" + escapeValue(value) + "\" TO *]"); |
| } else { |
| throw new IllegalArgumentException("Relation is not supported for string value: " + predicate); |
| } |
| } else if (value instanceof Geoshape) { |
| final Mapping map = Mapping.getMapping(information.get(key)); |
| Preconditions.checkArgument(predicate instanceof Geo && predicate != Geo.DISJOINT, |
| "Relation not supported on geo types: %s", predicate); |
| Preconditions.checkArgument(map == Mapping.PREFIX_TREE || predicate == Geo.WITHIN || predicate == Geo.INTERSECT, |
| "Relation not supported on geopoint types: %s", predicate); |
| final Geoshape geo = (Geoshape)value; |
| if (geo.getType() == Geoshape.Type.CIRCLE && (predicate == Geo.INTERSECT || map == Mapping.DEFAULT)) { |
| final Geoshape.Point center = geo.getPoint(); |
| return ("{!geofilt sfield=" + key + |
| " pt=" + center.getLatitude() + "," + center.getLongitude() + |
| " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers |
| } else if (geo.getType() == Geoshape.Type.BOX && (predicate == Geo.INTERSECT || map == Mapping.DEFAULT)) { |
| final Geoshape.Point southwest = geo.getPoint(0); |
| final Geoshape.Point northeast = geo.getPoint(1); |
| return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() + |
| " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]"); |
| } else if (map == Mapping.PREFIX_TREE) { |
| return key + ":\"" + SPATIAL_PREDICATES.get(predicate) + "(" + geo + ")\" distErrPct=0"; |
| } else { |
| throw new IllegalArgumentException("Unsupported or invalid search shape type: " + geo.getType()); |
| } |
| } else if (value instanceof Date || value instanceof Instant) { |
| final String s = value.toString(); |
| final String queryValue = escapeValue(value instanceof Date ? toIsoDate((Date) value) : value.toString()); |
| Preconditions.checkArgument(predicate instanceof Cmp, "Relation not supported on date types: %s", predicate); |
| final Cmp numRel = (Cmp) predicate; |
| |
| switch (numRel) { |
| case EQUAL: |
| return (key + ":" + queryValue); |
| case NOT_EQUAL: |
| return ("-" + key + ":" + queryValue); |
| case LESS_THAN: |
| //use right curly to mean up to but not including value |
| return (key + ":[* TO " + queryValue + "}"); |
| case LESS_THAN_EQUAL: |
| return (key + ":[* TO " + queryValue + "]"); |
| case GREATER_THAN: |
| //use left curly to mean greater than but not including value |
| return (key + ":{" + queryValue + " TO *]"); |
| case GREATER_THAN_EQUAL: |
| return (key + ":[" + queryValue + " TO *]"); |
| default: throw new IllegalArgumentException("Unexpected relation: " + numRel); |
| } |
| } else if (value instanceof Boolean) { |
| final Cmp numRel = (Cmp) predicate; |
| final String queryValue = escapeValue(value); |
| switch (numRel) { |
| case EQUAL: |
| return (key + ":" + queryValue); |
| case NOT_EQUAL: |
| return ("-" + key + ":" + queryValue); |
| default: |
| throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); |
| } |
| } else if (value instanceof UUID) { |
| if (predicate == Cmp.EQUAL) { |
| return (key + ":\"" + escapeValue(value) + "\""); |
| } else if (predicate == Cmp.NOT_EQUAL) { |
| return ("-" + key + ":\"" + escapeValue(value) + "\""); |
| } else { |
| throw new IllegalArgumentException("Relation is not supported for uuid value: " + predicate); |
| } |
| } else throw new IllegalArgumentException("Unsupported type: " + value); |
| } else if (condition instanceof Not) { |
| final String sub = buildQueryFilter(((Not)condition).getChild(),information); |
| if (StringUtils.isNotBlank(sub)) return "-("+sub+")"; |
| else return ""; |
| } else if (condition instanceof And) { |
| final int numChildren = ((And) condition).size(); |
| final StringBuilder sb = new StringBuilder(); |
| for (final Condition<JanusGraphElement> c : condition.getChildren()) { |
| final String sub = buildQueryFilter(c, information); |
| |
| if (StringUtils.isBlank(sub)) |
| continue; |
| |
| // we don't have to add "+" which means AND iff |
| // a. it's a NOT query, |
| // b. expression is a single statement in the AND. |
| if (!sub.startsWith("-") && numChildren > 1) |
| sb.append("+"); |
| |
| sb.append(sub).append(" "); |
| } |
| return sb.toString(); |
| } else if (condition instanceof Or) { |
| final StringBuilder sb = new StringBuilder(); |
| int element=0; |
| for (final Condition<JanusGraphElement> c : condition.getChildren()) { |
| final String sub = buildQueryFilter(c,information); |
| if (StringUtils.isBlank(sub)) continue; |
| if (element==0) sb.append("("); |
| else sb.append(" OR "); |
| sb.append(sub); |
| element++; |
| } |
| if (element>0) sb.append(")"); |
| return sb.toString(); |
| } else { |
| throw new IllegalArgumentException("Invalid condition: " + condition); |
| } |
| } |
| |
| private String tokenize(KeyInformation.StoreRetriever information, Object value, String key, |
| JanusGraphPredicate janusgraphPredicate, String tokenizer) { |
| List<String> terms; |
| if(tokenizer != null){ |
| terms = customTokenize(tokenizer, (String) value); |
| } else { |
| terms = Text.tokenize((String) value); |
| } |
| if (terms.isEmpty()) { |
| return ""; |
| } else if (terms.size() == 1) { |
| if (janusgraphPredicate == Cmp.NOT_EQUAL) { |
| return ("-" + key + ":(" + escapeValue(terms.get(0)) + ")"); |
| } else { |
| return (key + ":(" + escapeValue(terms.get(0)) + ")"); |
| } |
| } else { |
| final And<JanusGraphElement> andTerms = new And<>(); |
| for (final String term : terms) { |
| andTerms.add(new PredicateCondition<>(key, janusgraphPredicate, term)); |
| } |
| return buildQueryFilter(andTerms, information); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private List<String> customTokenize(String tokenizerClass, String value){ |
| CachingTokenFilter stream = null; |
| try { |
| final List<String> terms = new ArrayList<>(); |
| final Tokenizer tokenizer |
| = ((Constructor<Tokenizer>) ClassLoader.getSystemClassLoader().loadClass(tokenizerClass) |
| .getConstructor()).newInstance(); |
| tokenizer.setReader(new StringReader(value)); |
| stream = new CachingTokenFilter(tokenizer); |
| final TermToBytesRefAttribute termAtt = stream.getAttribute(TermToBytesRefAttribute.class); |
| stream.reset(); |
| while (stream.incrementToken()) { |
| terms.add(termAtt.getBytesRef().utf8ToString()); |
| } |
| return terms; |
| } catch ( ReflectiveOperationException | IOException e) { |
| throw new IllegalArgumentException(e.getMessage(),e); |
| } finally { |
| IOUtils.closeQuietly(stream); |
| } |
| } |
| |
| private String toIsoDate(Date value) { |
| final TimeZone tz = TimeZone.getTimeZone("UTC"); |
| final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); |
| df.setTimeZone(tz); |
| return df.format(value); |
| } |
| |
| /** |
| * Solr handles all transactions on the server-side. That means all |
| * commit, optimize, or rollback applies since the last commit/optimize/rollback. |
| * Solr documentation recommends best way to update Solr is in one process to avoid |
| * race conditions. |
| * |
| * @return New Transaction Handle |
| */ |
| @Override |
| public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) { |
| return new DefaultTransaction(config); |
| } |
| |
| @Override |
| public void close() throws BackendException { |
| logger.trace("Shutting down connection to Solr {}", solrClient); |
| try { |
| solrClient.close(); |
| } catch (final IOException e) { |
| throw new TemporaryBackendException(e); |
| } |
| } |
| |
| @Override |
| public void clearStorage() throws BackendException { |
| try { |
| if (mode!= Mode.CLOUD) { |
| logger.error("Operation only supported for SolrCloud. Cores must be deleted manually through the Solr API when using HTTP mode."); |
| return; |
| } |
| logger.debug("Clearing storage from Solr: {}", solrClient); |
| final ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader(); |
| zkStateReader.forciblyRefreshAllClusterStateSlow(); |
| final ClusterState clusterState = zkStateReader.getClusterState(); |
| for (final String collection : clusterState.getCollectionsMap().keySet()) { |
| logger.debug("Clearing collection [{}] in Solr",collection); |
| // Collection is not dropped because it may have been created externally |
| final UpdateRequest deleteAll = newUpdateRequest(); |
| deleteAll.deleteByQuery("*:*"); |
| solrClient.request(deleteAll, collection); |
| } |
| |
| } catch (final SolrServerException e) { |
| logger.error("Unable to clear storage from index due to server error on Solr.", e); |
| throw new PermanentBackendException(e); |
| } catch (final IOException e) { |
| logger.error("Unable to clear storage from index due to low-level I/O error.", e); |
| throw new PermanentBackendException(e); |
| } catch (final Exception e) { |
| logger.error("Unable to clear storage from index due to general error.", e); |
| throw new PermanentBackendException(e); |
| } |
| } |
| |
| @Override |
| public boolean supports(KeyInformation information, JanusGraphPredicate predicate) { |
| final Class<?> dataType = information.getDataType(); |
| final Mapping mapping = Mapping.getMapping(information); |
| if (mapping!=Mapping.DEFAULT && !AttributeUtils.isString(dataType) && |
| !(mapping==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType))) return false; |
| |
| if (Number.class.isAssignableFrom(dataType)) { |
| return predicate instanceof Cmp; |
| } else if (dataType == Geoshape.class) { |
| switch(mapping) { |
| case DEFAULT: |
| return predicate == Geo.WITHIN || predicate == Geo.INTERSECT; |
| case PREFIX_TREE: |
| return predicate == Geo.INTERSECT || predicate == Geo.WITHIN || predicate == Geo.CONTAINS; |
| } |
| } else if (AttributeUtils.isString(dataType)) { |
| switch(mapping) { |
| case DEFAULT: |
| case TEXT: |
| return predicate == Text.CONTAINS || predicate == Text.CONTAINS_PREFIX |
| || predicate == Text.CONTAINS_REGEX || predicate == Text.CONTAINS_FUZZY; |
| case STRING: |
| return predicate instanceof Cmp || predicate==Text.REGEX || predicate==Text.PREFIX || predicate == Text.FUZZY; |
| // case TEXTSTRING: |
| // return (janusgraphPredicate instanceof Text) || janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate==Cmp.NOT_EQUAL; |
| } |
| } else if (dataType == Date.class || dataType == Instant.class) { |
| return predicate instanceof Cmp; |
| } else if (dataType == Boolean.class) { |
| return predicate == Cmp.EQUAL || predicate == Cmp.NOT_EQUAL; |
| } else if (dataType == UUID.class) { |
| return predicate == Cmp.EQUAL || predicate==Cmp.NOT_EQUAL; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean supports(KeyInformation information) { |
| final Class<?> dataType = information.getDataType(); |
| final Mapping mapping = Mapping.getMapping(information); |
| if (Number.class.isAssignableFrom(dataType) || dataType == Date.class || dataType == Instant.class |
| || dataType == Boolean.class || dataType == UUID.class) { |
| return mapping == Mapping.DEFAULT; |
| } else if (AttributeUtils.isString(dataType)) { |
| return mapping == Mapping.DEFAULT || mapping == Mapping.TEXT || mapping == Mapping.STRING; |
| } else if (AttributeUtils.isGeo(dataType)) { |
| return mapping == Mapping.DEFAULT || mapping == Mapping.PREFIX_TREE; |
| } |
| return false; |
| } |
| |
| @Override |
| public String mapKey2Field(String key, KeyInformation keyInfo) { |
| IndexProvider.checkKeyValidity(key); |
| key = key.replace(' ', REPLACEMENT_CHAR); |
| |
| if (!dynFields) return key; |
| if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key; |
| String postfix; |
| final Class dataType = keyInfo.getDataType(); |
| if (AttributeUtils.isString(dataType)) { |
| final Mapping map = getStringMapping(keyInfo); |
| switch (map) { |
| case TEXT: postfix = "_t"; break; |
| case STRING: postfix = "_s"; break; |
| default: throw new IllegalArgumentException("Unsupported string mapping: " + map); |
| } |
| } else if (AttributeUtils.isWholeNumber(dataType)) { |
| if (dataType.equals(Long.class)) postfix = "_l"; |
| else postfix = "_i"; |
| } else if (AttributeUtils.isDecimal(dataType)) { |
| if (dataType.equals(Float.class)) postfix = "_f"; |
| else postfix = "_d"; |
| } else if (dataType.equals(BigInteger.class)) { |
| postfix = "_bi"; |
| } else if (dataType.equals(BigDecimal.class)) { |
| postfix = "_bd"; |
| } else if (dataType.equals(Geoshape.class)) { |
| postfix = "_g"; |
| } else if (dataType.equals(Date.class) || dataType.equals(Instant.class)) { |
| postfix = "_dt"; |
| } else if (dataType.equals(Boolean.class)) { |
| postfix = "_b"; |
| } else if (dataType.equals(UUID.class)) { |
| postfix = "_uuid"; |
| } else throw new IllegalArgumentException("Unsupported data type ["+dataType+"] for field: " + key); |
| |
| if (keyInfo.getCardinality() == Cardinality.SET || keyInfo.getCardinality() == Cardinality.LIST) { |
| postfix += "s"; |
| } |
| return key+postfix; |
| } |
| |
| @Override |
| public IndexFeatures getFeatures() { |
| return SOLR_FEATURES; |
| } |
| |
| @Override |
| public boolean exists() throws BackendException { |
| if (mode!= Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud"); |
| final CloudSolrClient server = (CloudSolrClient) solrClient; |
| try { |
| final ZkStateReader zkStateReader = server.getZkStateReader(); |
| zkStateReader.forciblyRefreshAllClusterStateSlow(); |
| final ClusterState clusterState = zkStateReader.getClusterState(); |
| final Map<String, DocCollection> collections = clusterState.getCollectionsMap(); |
| return collections != null && !collections.isEmpty(); |
| } catch (KeeperException | InterruptedException e) { |
| throw new PermanentBackendException("Unable to check if index exists", e); |
| } |
| } |
| |
| /* |
| ################# UTILITY METHODS ####################### |
| */ |
| |
| private static Mapping getStringMapping(KeyInformation information) { |
| assert AttributeUtils.isString(information.getDataType()); |
| Mapping map = Mapping.getMapping(information); |
| if (map==Mapping.DEFAULT) map = Mapping.TEXT; |
| return map; |
| } |
| |
| private static Map<Geo, String> spatialPredicates() { |
| return Collections.unmodifiableMap(Stream.of( |
| new SimpleEntry<>(Geo.WITHIN, "IsWithin"), |
| new SimpleEntry<>(Geo.CONTAINS, "Contains"), |
| new SimpleEntry<>(Geo.INTERSECT, "Intersects"), |
| new SimpleEntry<>(Geo.DISJOINT, "IsDisjointTo")) |
| .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))); |
| } |
| |
| private UpdateRequest newUpdateRequest() { |
| final UpdateRequest req = new UpdateRequest(); |
| if(waitSearcher) { |
| req.setAction(UpdateRequest.ACTION.COMMIT, true, true); |
| } |
| return req; |
| } |
| |
| private BackendException storageException(Exception solrException) { |
| return new TemporaryBackendException("Unable to complete query on Solr.", solrException); |
| } |
| |
| private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection) |
| throws IOException, SolrServerException, KeeperException, InterruptedException { |
| if (!checkIfCollectionExists(client, collection)) { |
| final Integer numShards = config.get(NUM_SHARDS); |
| final Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE); |
| final Integer replicationFactor = config.get(REPLICATION_FACTOR); |
| |
| |
| // Ideally this property used so a new configset is not uploaded for every single |
| // index (collection) created in solr. |
| // if a generic configSet is not set, make the configset name the same as the collection. |
| // This was the default behavior before a default configSet could be specified |
| final String genericConfigSet = config.has(SOLR_DEFAULT_CONFIG) ? config.get(SOLR_DEFAULT_CONFIG):collection; |
| |
| final CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collection, genericConfigSet, numShards, replicationFactor); |
| createRequest.setMaxShardsPerNode(maxShardsPerNode); |
| |
| final CollectionAdminResponse createResponse = createRequest.process(client); |
| if (createResponse.isSuccess()) { |
| logger.trace("Collection {} successfully created.", collection); |
| } else { |
| throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages())); |
| } |
| } |
| |
| waitForRecoveriesToFinish(client, collection); |
| } |
| |
| /** |
| * Checks if the collection has already been created in Solr. |
| */ |
| private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { |
| final ZkStateReader zkStateReader = server.getZkStateReader(); |
| zkStateReader.forceUpdateCollection(collection); |
| final ClusterState clusterState = zkStateReader.getClusterState(); |
| return clusterState.getCollectionOrNull(collection) != null; |
| } |
| |
| /** |
| * Wait for all the collection shards to be ready. |
| */ |
| private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { |
| final ZkStateReader zkStateReader = server.getZkStateReader(); |
| try { |
| boolean cont = true; |
| |
| while (cont) { |
| boolean sawLiveRecovering = false; |
| zkStateReader.forceUpdateCollection(collection); |
| final ClusterState clusterState = zkStateReader.getClusterState(); |
| final Map<String, Slice> slices = clusterState.getCollection(collection).getSlicesMap(); |
| Preconditions.checkNotNull(slices, "Could not find collection:" + collection); |
| |
| // change paths for Replica.State per Solr refactoring |
| // remove SYNC state per: http://tinyurl.com/pag6rwt |
| for (final Map.Entry<String, Slice> entry : slices.entrySet()) { |
| final Map<String, Replica> shards = entry.getValue().getReplicasMap(); |
| for (final Map.Entry<String, Replica> shard : shards.entrySet()) { |
| final String state = shard.getValue().getStr(ZkStateReader.STATE_PROP).toUpperCase(); |
| if ((Replica.State.RECOVERING.name().equals(state) || Replica.State.DOWN.name().equals(state)) |
| && clusterState.liveNodesContain(shard.getValue().getStr( |
| ZkStateReader.NODE_NAME_PROP))) { |
| sawLiveRecovering = true; |
| } |
| } |
| } |
| |
| |
| if (!sawLiveRecovering) { |
| cont = false; |
| } else { |
| Thread.sleep(1000); |
| } |
| } |
| } finally { |
| logger.info("Exiting solr wait"); |
| } |
| } |
| } |