| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.solr; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| import javax.activation.MimetypesFileTypeMap; |
| |
| import org.apache.camel.Exchange; |
| import org.apache.camel.WrappedFile; |
| import org.apache.camel.support.DefaultProducer; |
| import org.apache.camel.util.ObjectHelper; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrQuery; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION; |
| import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; |
| import org.apache.solr.client.solrj.request.DirectXmlRequest; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.client.solrj.response.QueryResponse; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| |
| /** |
| * The Solr producer. |
| */ |
| public class SolrProducer extends DefaultProducer { |
| private SolrClient httpServer; |
| private SolrClient concSolrServer; |
| private SolrClient cloudSolrServer; |
| |
| public SolrProducer(SolrEndpoint endpoint, SolrClient solrServer, SolrClient concSolrServer, |
| SolrClient cloudSolrServer) { |
| super(endpoint); |
| this.httpServer = solrServer; |
| this.concSolrServer = concSolrServer; |
| this.cloudSolrServer = cloudSolrServer; |
| } |
| |
| private SolrClient getBestSolrServer(String operation) { |
| if (this.cloudSolrServer != null) { |
| return this.cloudSolrServer; |
| } else if (SolrConstants.OPERATION_INSERT_STREAMING.equals(operation)) { |
| return this.concSolrServer; |
| } else { |
| return this.httpServer; |
| } |
| } |
| |
| @Override |
| public void process(Exchange exchange) throws Exception { |
| |
| String operation = (String) exchange.getIn().getHeader(SolrConstants.OPERATION); |
| |
| if (operation == null) { |
| throw new IllegalArgumentException(SolrConstants.OPERATION + " header is missing"); |
| } |
| |
| SolrClient serverToUse = getBestSolrServer(operation); |
| |
| if (operation.equalsIgnoreCase(SolrConstants.OPERATION_INSERT)) { |
| insert(exchange, serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_INSERT_STREAMING)) { |
| insert(exchange, serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_DELETE_BY_ID)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.deleteById(exchange.getIn().getBody(String.class)); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_DELETE_BY_QUERY)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.deleteByQuery(exchange.getIn().getBody(String.class)); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_ADD_BEAN)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.add(serverToUse.getBinder().toSolrInputDocument(exchange.getIn().getBody())); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_ADD_BEANS)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| Collection<Object> body = exchange.getIn().getBody(Collection.class); |
| updateRequest.add(body.stream().map(serverToUse.getBinder()::toSolrInputDocument).collect(Collectors.toList())); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_COMMIT)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.setAction(ACTION.COMMIT, true, true); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_SOFT_COMMIT)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.setAction(ACTION.COMMIT, true, true, true); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_ROLLBACK)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.rollback(); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_OPTIMIZE)) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.setAction(ACTION.OPTIMIZE, true, true, 1); |
| updateRequest.process(serverToUse); |
| } else if (operation.equalsIgnoreCase(SolrConstants.OPERATION_QUERY)) { |
| query(exchange, serverToUse); |
| } else { |
| throw new IllegalArgumentException( |
| SolrConstants.OPERATION + " header value '" + operation + "' is not supported"); |
| } |
| } |
| |
| private void query(Exchange exchange, SolrClient serverToUse) throws SolrServerException, IOException { |
| SolrQuery solrQuery = new SolrQuery(); |
| if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(SolrConstants.QUERY_STRING))) { |
| solrQuery.setQuery(exchange.getMessage().getHeader(SolrConstants.QUERY_STRING, String.class)); |
| } else { |
| throw new IllegalArgumentException("Query String needs to be set as header while querying Solr"); |
| } |
| QueryRequest queryRequest = new QueryRequest(solrQuery); |
| queryRequest.setBasicAuthCredentials(getEndpoint().getUsername(), getEndpoint().getPassword()); |
| QueryResponse p = queryRequest.process(serverToUse); |
| exchange.getMessage().setBody(p.getResults()); |
| } |
| |
| private void insert(Exchange exchange, SolrClient solrServer) throws Exception { |
| Object body = exchange.getIn().getBody(); |
| boolean invalid = false; |
| if (body instanceof WrappedFile) { |
| body = ((WrappedFile<?>) body).getFile(); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(Exchange.CONTENT_TYPE, String.class))) { |
| String mimeType = exchange.getIn().getHeader(Exchange.CONTENT_TYPE, String.class); |
| ContentStreamUpdateRequest updateRequest = new ContentStreamUpdateRequest(getRequestHandler()); |
| updateRequest.setBasicAuthCredentials(getEndpoint().getUsername(), getEndpoint().getPassword()); |
| updateRequest.addFile((File) body, mimeType); |
| |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| if (entry.getKey().startsWith(SolrConstants.PARAM)) { |
| String paramName = entry.getKey().substring(SolrConstants.PARAM.length()); |
| updateRequest.setParam(paramName, entry.getValue().toString()); |
| } |
| } |
| |
| updateRequest.process(solrServer); |
| } else { |
| |
| if (body instanceof File) { |
| MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); |
| String mimeType = mimeTypesMap.getContentType((File) body); |
| ContentStreamUpdateRequest updateRequest = new ContentStreamUpdateRequest(getRequestHandler()); |
| updateRequest.setBasicAuthCredentials(getEndpoint().getUsername(), getEndpoint().getPassword()); |
| updateRequest.addFile((File) body, mimeType); |
| |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| if (entry.getKey().startsWith(SolrConstants.PARAM)) { |
| String paramName = entry.getKey().substring(SolrConstants.PARAM.length()); |
| updateRequest.setParam(paramName, entry.getValue().toString()); |
| } |
| } |
| |
| updateRequest.process(solrServer); |
| |
| } else if (body instanceof SolrInputDocument) { |
| |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.add((SolrInputDocument) body); |
| |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| if (entry.getKey().startsWith(SolrConstants.PARAM)) { |
| String paramName = entry.getKey().substring(SolrConstants.PARAM.length()); |
| updateRequest.setParam(paramName, entry.getValue().toString()); |
| } |
| } |
| |
| updateRequest.process(solrServer); |
| |
| } else if (body instanceof List<?>) { |
| List<?> list = (List<?>) body; |
| |
| if (list.size() > 0 && list.get(0) instanceof SolrInputDocument) { |
| UpdateRequest updateRequest = createUpdateRequest(); |
| updateRequest.add((List<SolrInputDocument>) list); |
| |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| if (entry.getKey().startsWith(SolrConstants.PARAM)) { |
| String paramName = entry.getKey().substring(SolrConstants.PARAM.length()); |
| updateRequest.setParam(paramName, entry.getValue().toString()); |
| } |
| } |
| |
| updateRequest.process(solrServer); |
| } else { |
| invalid = true; |
| } |
| |
| } else { |
| |
| boolean hasSolrHeaders = false; |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| if (entry.getKey().startsWith(SolrConstants.FIELD)) { |
| hasSolrHeaders = true; |
| break; |
| } |
| } |
| |
| if (hasSolrHeaders) { |
| |
| UpdateRequest updateRequest = createUpdateRequest(); |
| |
| SolrInputDocument doc = new SolrInputDocument(); |
| for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) { |
| if (entry.getKey().startsWith(SolrConstants.FIELD)) { |
| String fieldName = entry.getKey().substring(SolrConstants.FIELD.length()); |
| doc.setField(fieldName, entry.getValue()); |
| } |
| } |
| updateRequest.add(doc); |
| updateRequest.process(solrServer); |
| |
| } else if (body instanceof String) { |
| |
| String bodyAsString = (String) body; |
| |
| if (!bodyAsString.startsWith("<add")) { |
| bodyAsString = "<add>" + bodyAsString + "</add>"; |
| } |
| |
| DirectXmlRequest xmlRequest = new DirectXmlRequest(getRequestHandler(), bodyAsString); |
| xmlRequest.setBasicAuthCredentials(getEndpoint().getUsername(), getEndpoint().getPassword()); |
| |
| solrServer.request(xmlRequest); |
| } else { |
| invalid = true; |
| } |
| } |
| } |
| |
| if (invalid) { |
| throw new SolrException( |
| SolrException.ErrorCode.BAD_REQUEST, |
| "unable to find data in Exchange to update Solr"); |
| } |
| } |
| |
| private String getRequestHandler() { |
| String requestHandler = getEndpoint().getRequestHandler(); |
| return (requestHandler == null) ? "/update" : requestHandler; |
| } |
| |
| private UpdateRequest createUpdateRequest() { |
| UpdateRequest updateRequest = new UpdateRequest(getRequestHandler()); |
| updateRequest.setBasicAuthCredentials(getEndpoint().getUsername(), getEndpoint().getPassword()); |
| return updateRequest; |
| } |
| |
| @Override |
| public SolrEndpoint getEndpoint() { |
| return (SolrEndpoint) super.getEndpoint(); |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| getEndpoint().onProducerShutdown(this); |
| } |
| |
| } |