blob: f8950c037ca95931b1956615656a33833e9052fb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.solr.mapper;
import static org.apache.storm.solr.schema.SolrFieldTypeFinder.FieldTypeWrapper;
import java.util.LinkedList;
import java.util.List;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.storm.solr.schema.FieldType;
import org.apache.storm.solr.schema.Schema;
import org.apache.storm.solr.schema.SolrFieldTypeFinder;
import org.apache.storm.solr.schema.builder.SchemaBuilder;
import org.apache.storm.tuple.ITuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrFieldsMapper implements SolrMapper {
private static final Logger log = LoggerFactory.getLogger(SolrFieldsMapper.class);
private String collection;
private SolrFieldTypeFinder typeFinder;
private String multiValueFieldToken;
private SolrFieldsMapper(Builder builder) {
this.collection = builder.collection;
this.typeFinder = builder.typeFinder;
this.multiValueFieldToken = builder.multiValueFieldToken;
log.debug("Created {} with the following configuration: [{}] "
+ this.getClass().getSimpleName(), this.toString());
}
@Override
public void configure() {
typeFinder.initialize();
}
@Override
public String getCollection() {
return collection;
}
@Override
public SolrRequest toSolrRequest(List<? extends ITuple> tuples) throws SolrMapperException {
List<SolrInputDocument> docs = new LinkedList<>();
for (ITuple tuple : tuples) {
docs.add(buildDocument(tuple));
}
UpdateRequest request = new UpdateRequest();
request.add(docs);
log.debug("Created SolrRequest with content: {}", docs);
return request;
}
@Override
public SolrRequest toSolrRequest(ITuple tuple) throws SolrMapperException {
SolrInputDocument doc = buildDocument(tuple);
UpdateRequest request = new UpdateRequest();
request.add(doc);
log.debug("Created SolrRequest with content: {}", doc);
return request;
}
private SolrInputDocument buildDocument(ITuple tuple) {
SolrInputDocument doc = new SolrInputDocument();
for (String tupleField : tuple.getFields()) {
FieldTypeWrapper fieldTypeWrapper = typeFinder.getFieldTypeWrapper(tupleField);
if (fieldTypeWrapper != null) {
FieldType fieldType = fieldTypeWrapper.getType();
if (fieldType.isMultiValued()) {
addMultivalueFieldToDoc(doc, tupleField, tuple);
} else {
addFieldToDoc(doc, tupleField, tuple);
}
} else {
log.debug("Field [{}] does NOT match static or dynamic field declared in schema. Not added to document", tupleField);
}
}
return doc;
}
private void addFieldToDoc(SolrInputDocument doc, String tupleField, ITuple tuple) {
Object val = getValue(tupleField, tuple);
log.debug("Adding to document (field, val) = ({}, {})", tupleField, val);
doc.addField(tupleField, val);
}
private void addMultivalueFieldToDoc(SolrInputDocument doc, String tupleField, ITuple tuple) {
String[] values = getValues(tupleField, tuple);
for (String value : values) {
log.debug("Adding {} to multivalue field document {}", value, tupleField);
doc.addField(tupleField, value);
}
}
private Object getValue(String field, ITuple tuple) {
return tuple.getValueByField(field);
}
private String[] getValues(String field, ITuple tuple) {
String multiValueField = tuple.getStringByField(field);
String[] values = multiValueField.split(multiValueFieldToken);
return values;
}
@Override
public String toString() {
return "SolrFieldsMapper{"
+ "collection='" + collection + '\''
+ ", typeFinder=" + typeFinder
+ ", multiValueFieldToken='" + multiValueFieldToken + '\''
+ '}';
}
public static class Builder {
private String collection;
private SolrFieldTypeFinder typeFinder;
private String multiValueFieldToken = "|";
/**
* {@link SolrFieldsMapper} builder class.
* @param schemaBuilder Solr {@link Schema} builder class
* @param collection Name of the Solr collection to update using the SolrRequest
*/
public Builder(SchemaBuilder schemaBuilder, String collection) {
setTypeFinder(schemaBuilder);
this.collection = collection;
}
/**
* {@link SolrFieldsMapper} builder class.
* @param schemaBuilder Solr {@link Schema} builder class
* @param solrClient {@link SolrClient} implementation from where to extract the default Solr collection, if any defined.
*/
public Builder(SchemaBuilder schemaBuilder, SolrClient solrClient) {
setTypeFinder(schemaBuilder);
}
//TODO Handle the case where there may be no schema
private void setTypeFinder(SchemaBuilder schemaBuilder) {
typeFinder = new SolrFieldTypeFinder(schemaBuilder);
}
// Sets {@link SolrFieldsMapper} to use the default Solr collection if there is one defined
private void setDefaultCollection(SolrClient solrClient) {
String defaultCollection = null;
if (solrClient instanceof CloudSolrClient) {
defaultCollection = ((CloudSolrClient) solrClient).getDefaultCollection();
}
this.collection = defaultCollection;
}
/**
* Sets the token that separates multivalue fields in tuples. The default token is |
* */
public Builder setMultiValueFieldToken(String multiValueFieldToken) {
this.multiValueFieldToken = multiValueFieldToken;
return this;
}
public SolrFieldsMapper build() {
return new SolrFieldsMapper(this);
}
}
}