blob: b23a517ff32fddef6a42aa39796f10de1f457d0e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.solr.writer;
import static org.apache.metron.solr.SolrConstants.SOLR_WRITER_NAME;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.apache.metron.solr.SolrConstants;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
public static final String JAVA_SECURITY_CONFIG_PROPERTY = "java.security.auth.login.config";
public enum SolrProperties {
ZOOKEEPER_QUORUM(SolrConstants.SOLR_ZOOKEEPER),
COMMIT_PER_BATCH("solr.commitPerBatch", Optional.of(true)),
COMMIT_WAIT_SEARCHER("solr.commit.waitSearcher", Optional.of(true)),
COMMIT_WAIT_FLUSH("solr.commit.waitFlush", Optional.of(true)),
COMMIT_SOFT("solr.commit.soft", Optional.of(false)),
DEFAULT_COLLECTION("solr.collection", Optional.of("metron")),
HTTP_CONFIG("solr.http.config", Optional.of(new HashMap<>()))
;
String name;
Optional<Object> defaultValue;
SolrProperties(String name) {
this(name, Optional.empty());
}
SolrProperties(String name, Optional<Object> defaultValue) {
this.name = name;
this.defaultValue = defaultValue;
}
public <T> Optional<T> coerceOrDefault(Map<String, Object> globalConfig, Class<T> clazz) {
Object val = globalConfig.get(name);
if(val != null) {
T ret = null;
try {
ret = ConversionUtils.convert(val, clazz);
}
catch(ClassCastException cce) {
ret = null;
}
if(ret == null) {
//unable to convert value
LOG.warn("Unable to convert {} to {}, was {}", name, clazz.getName(), "" + val);
if(defaultValue.isPresent()) {
return Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz));
}
else {
return Optional.empty();
}
}
else {
return Optional.ofNullable(ret);
}
}
else {
if(defaultValue.isPresent()) {
return Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz));
}
else {
return Optional.empty();
}
}
}
public Supplier<IllegalArgumentException> errorOut(Map<String, Object> globalConfig) {
String message = "Unable to retrieve " + name + " from global config, value associated is " + globalConfig.get(name);
return () -> new IllegalArgumentException(message);
}
public <T> T coerceOrDefaultOrExcept(Map<String, Object> globalConfig, Class<T> clazz) {
return this.coerceOrDefault(globalConfig, clazz).orElseThrow(this.errorOut(globalConfig));
}
}
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Boolean shouldCommit;
private Boolean softCommit;
private Boolean waitSearcher;
private Boolean waitFlush;
private String zookeeperUrl;
private String defaultCollection;
private Map<String, Object> solrHttpConfig;
private MetronSolrClient solr;
public SolrWriter withMetronSolrClient(MetronSolrClient solr) {
this.solr = solr;
return this;
}
public void initializeFromGlobalConfig(Map<String, Object> globalConfiguration) {
zookeeperUrl = SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept(globalConfiguration, String.class);
defaultCollection = SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept(globalConfiguration, String.class);
solrHttpConfig = SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(globalConfiguration, Map.class);
shouldCommit = SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
softCommit = SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
waitSearcher = SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
waitFlush = SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
}
@Override
public void init(Map stormConf, WriterConfiguration configurations) throws IOException, SolrServerException {
Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
initializeFromGlobalConfig(globalConfiguration);
LOG.info("Initializing SOLR writer: {}", zookeeperUrl);
LOG.info("Forcing commit per batch: {}", shouldCommit);
LOG.info("Soft commit: {}", softCommit);
LOG.info("Commit Wait Searcher: {}", waitSearcher);
LOG.info("Commit Wait Flush: {}", waitFlush);
LOG.info("Default Collection: {}", "" + defaultCollection );
if(solr == null) {
if (isKerberosEnabled(stormConf)) {
HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer());
}
solr = new MetronSolrClient(zookeeperUrl, solrHttpConfig);
}
solr.setDefaultCollection(defaultCollection);
}
public Collection<SolrInputDocument> toDocs(Iterable<BulkMessage<JSONObject>> messages) {
Collection<SolrInputDocument> ret = new ArrayList<>();
for(BulkMessage<JSONObject> bulkWriterMessage: messages) {
SolrInputDocument document = new SolrInputDocument();
JSONObject message = bulkWriterMessage.getMessage();
for (Object key : message.keySet()) {
Object value = message.get(key);
if (value instanceof Iterable) {
for (Object v : (Iterable) value) {
document.addField("" + key, v);
}
} else {
document.addField("" + key, value);
}
}
if (!document.containsKey(Constants.GUID)) {
document.addField(Constants.GUID, UUID.randomUUID().toString());
}
ret.add(document);
}
return ret;
}
protected String getCollection(String sourceType, WriterConfiguration configurations) {
String collection = configurations.getIndex(sourceType);
if(StringUtils.isEmpty(collection)) {
return solr.getDefaultCollection();
}
return collection;
}
@Override
public BulkWriterResponse write(String sourceType, WriterConfiguration configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
String collection = getCollection(sourceType, configurations);
BulkWriterResponse bulkResponse = new BulkWriterResponse();
Collection<SolrInputDocument> docs = toDocs(messages);
Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
try {
Optional<SolrException> exceptionOptional = fromUpdateResponse(solr.add(collection, docs));
// Solr commits the entire batch or throws an exception for it. There's no way to get partial failures.
if(exceptionOptional.isPresent()) {
bulkResponse.addAllErrors(exceptionOptional.get(), ids);
}
else {
if (shouldCommit) {
exceptionOptional = fromUpdateResponse(solr.commit(collection, waitFlush, waitSearcher, softCommit));
if(exceptionOptional.isPresent()) {
bulkResponse.addAllErrors(exceptionOptional.get(), ids);
}
}
if(!exceptionOptional.isPresent()) {
bulkResponse.addAllSuccesses(ids);
}
}
}
catch(HttpSolrClient.RemoteSolrException sse) {
bulkResponse.addAllErrors(sse, ids);
}
return bulkResponse;
}
protected Optional<SolrException> fromUpdateResponse(UpdateResponse response) {
if(response != null && response.getStatus() > 0) {
String message = "Solr Update response: " + Joiner.on(",").join(response.getResponse());
return Optional.of(new SolrException(SolrException.ErrorCode.BAD_REQUEST, message));
}
return Optional.empty();
}
@Override
public String getName() {
return SOLR_WRITER_NAME;
}
@Override
public void close() throws Exception {
if(solr != null) {
solr.close();
}
}
private boolean isKerberosEnabled(Map stormConfig) {
if (stormConfig == null) {
return false;
}
String value = (String) stormConfig.get(JAVA_SECURITY_CONFIG_PROPERTY);
return value != null && !value.isEmpty();
}
}