blob: 8e1615b278b7c6f4d94e3cf2845fe8ec7eb730e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.emitter.solr;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.client.HttpClientFactory;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.TikaEmitterException;
public class SolrEmitter extends AbstractEmitter implements Initializable {
private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class);
private final HttpClientFactory httpClientFactory;
private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
private UpdateStrategy updateStrategy = UpdateStrategy.ADD;
private String solrCollection;
/**
* You can specify solrUrls, or you can specify solrZkHosts and use use zookeeper to determine the solr server urls.
*/
private List<String> solrUrls;
private List<String> solrZkHosts;
private String solrZkChroot;
private String contentField = "content";
private String idField = "id";
private int commitWithin = 1000;
private int connectionTimeout = 10000;
private int socketTimeout = 60000;
private SolrClient solrClient;
public SolrEmitter() throws TikaConfigException {
httpClientFactory = new HttpClientFactory();
}
@Override
public void emit(String emitKey, List<Metadata> metadataList)
throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
LOG.warn("metadataList is null or empty");
return;
}
List<SolrInputDocument> docsToUpdate = new ArrayList<>();
addMetadataAsSolrInputDocuments(emitKey, metadataList, docsToUpdate);
emitSolrBatch(docsToUpdate);
}
private void addMetadataAsSolrInputDocuments(String emitKey, List<Metadata> metadataList,
List<SolrInputDocument> docsToUpdate)
throws IOException, TikaEmitterException {
SolrInputDocument solrInputDocument = new SolrInputDocument();
solrInputDocument.setField(idField, emitKey);
if (updateStrategy == UpdateStrategy.UPDATE_MUST_EXIST) {
solrInputDocument.setField("_version_", 1);
} else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
solrInputDocument.setField("_version_", -1);
}
if (metadataList.size() == 1) {
addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
docsToUpdate.add(solrInputDocument);
} else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
for (int i = 1; i < metadataList.size(); i++) {
SolrInputDocument childSolrInputDocument = new SolrInputDocument();
Metadata m = metadataList.get(i);
childSolrInputDocument
.setField(idField, emitKey + "-" + UUID.randomUUID().toString());
addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
solrInputDocument.addChildDocument(childSolrInputDocument);
}
docsToUpdate.add(solrInputDocument);
} else if (attachmentStrategy == AttachmentStrategy.SEPARATE_DOCUMENTS) {
addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
docsToUpdate.add(solrInputDocument);
for (int i = 1; i < metadataList.size(); i++) {
SolrInputDocument childSolrInputDocument = new SolrInputDocument();
Metadata m = metadataList.get(i);
childSolrInputDocument.setField(idField,
solrInputDocument.get(idField).getValue() + "-" + UUID.randomUUID().toString());
addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
docsToUpdate.add(childSolrInputDocument);
}
} else {
throw new IllegalArgumentException(
"I don't yet support this attachment strategy: " + attachmentStrategy);
}
}
@Override
public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
if (batch == null || batch.size() == 0) {
LOG.warn("batch is null or empty");
return;
}
List<SolrInputDocument> docsToUpdate = new ArrayList<>();
for (EmitData d : batch) {
addMetadataAsSolrInputDocuments(d.getEmitKey().getEmitKey(), d.getMetadataList(),
docsToUpdate);
}
emitSolrBatch(docsToUpdate);
}
private void emitSolrBatch(List<SolrInputDocument> docsToUpdate)
throws IOException, TikaEmitterException {
if (LOG.isDebugEnabled()) {
LOG.debug("Emitting solr doc batch: {}", docsToUpdate);
}
if (!docsToUpdate.isEmpty()) {
try {
UpdateRequest req = new UpdateRequest();
req.add(docsToUpdate);
req.setCommitWithin(commitWithin);
req.setParam("failOnVersionConflicts", "false");
req.process(solrClient, solrCollection);
} catch (Exception e) {
throw new TikaEmitterException("Could not add batch to solr", e);
}
}
}
private void addMetadataToSolrInputDocument(Metadata metadata,
SolrInputDocument solrInputDocument,
UpdateStrategy updateStrategy) {
for (String n : metadata.names()) {
String[] vals = metadata.getValues(n);
if (vals.length == 0) {
continue;
} else if (vals.length == 1) {
if (updateStrategy == UpdateStrategy.ADD) {
solrInputDocument.setField(n, vals[0]);
} else {
solrInputDocument.setField(n, new HashMap<String, String>() {
{
put("set", vals[0]);
}
});
}
} else if (vals.length > 1) {
if (updateStrategy == UpdateStrategy.ADD) {
solrInputDocument.setField(n, vals);
} else {
solrInputDocument.setField(n, new HashMap<String, String[]>() {
{
put("set", vals);
}
});
}
}
}
}
/**
* Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is "PARENT_CHILD".
* If set to "SKIP", this will index only the main file and ignore all info
* in the attachments. If set to "CONCATENATE_CONTENT", this will concatenate the
* content extracted from the attachments into the main document and
* then index the main document with the concatenated content _and_ the
* main document's metadata (metadata from attachments will be thrown away).
* If set to "PARENT_CHILD", this will index the attachments as children
* of the parent document via Solr's parent-child relationship.
*/
@Field
public void setAttachmentStrategy(String attachmentStrategy) {
this.attachmentStrategy = AttachmentStrategy.valueOf(attachmentStrategy);
}
@Field
public void setUpdateStrategy(String updateStrategy) {
this.updateStrategy = UpdateStrategy.valueOf(updateStrategy);
}
@Field
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
@Field
public void setSocketTimeout(int socketTimeout) {
this.socketTimeout = socketTimeout;
}
public int getCommitWithin() {
return commitWithin;
}
@Field
public void setCommitWithin(int commitWithin) {
this.commitWithin = commitWithin;
}
/**
* Specify the field in the first Metadata that should be
* used as the id field for the document.
*
* @param idField
*/
@Field
public void setIdField(String idField) {
this.idField = idField;
}
@Field
public void setSolrCollection(String solrCollection) {
this.solrCollection = solrCollection;
}
@Field
public void setSolrUrls(List<String> solrUrls) {
this.solrUrls = solrUrls;
}
@Field
public void setSolrZkHosts(List<String> solrZkHosts) {
this.solrZkHosts = solrZkHosts;
}
@Field
public void setSolrZkChroot(String solrZkChroot) {
this.solrZkChroot = solrZkChroot;
}
//TODO -- add other httpclient configurations??
@Field
public void setUserName(String userName) {
httpClientFactory.setUserName(userName);
}
@Field
public void setPassword(String password) {
httpClientFactory.setPassword(password);
}
@Field
public void setAuthScheme(String authScheme) {
httpClientFactory.setAuthScheme(authScheme);
}
@Field
public void setProxyHost(String proxyHost) {
httpClientFactory.setProxyHost(proxyHost);
}
@Field
public void setProxyPort(int proxyPort) {
httpClientFactory.setProxyPort(proxyPort);
}
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
if (solrUrls == null || solrUrls.isEmpty()) {
solrClient = new CloudSolrClient.Builder(solrZkHosts, Optional.ofNullable(solrZkChroot))
.withConnectionTimeout(connectionTimeout).withSocketTimeout(socketTimeout)
.withHttpClient(httpClientFactory.build()).build();
} else {
solrClient = new LBHttpSolrClient.Builder().withConnectionTimeout(connectionTimeout)
.withSocketTimeout(socketTimeout).withHttpClient(httpClientFactory.build())
.withBaseSolrUrls(solrUrls.toArray(new String[]{})).build();
}
}
@Override
public void checkInitialization(InitializableProblemHandler problemHandler)
throws TikaConfigException {
mustNotBeEmpty("solrCollection", this.solrCollection);
mustNotBeEmpty("urlFieldName", this.idField);
if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
(this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
throw new IllegalArgumentException(
"expected either param solrUrls or param solrZkHosts, but neither was specified");
}
if (this.solrUrls != null && !this.solrUrls.isEmpty() && this.solrZkHosts != null &&
!this.solrZkHosts.isEmpty()) {
throw new IllegalArgumentException(
"expected either param solrUrls or param solrZkHosts, but both were specified");
}
}
public enum AttachmentStrategy {
SEPARATE_DOCUMENTS, PARENT_CHILD,
//anything else?
}
public enum UpdateStrategy {
ADD, UPDATE_MUST_EXIST, UPDATE_MUST_NOT_EXIST,
}
}