blob: cf8a47367c73dd260be811180265b3903cec6e40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.DataInputInputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.CHILDDOC;
import static org.apache.solr.common.util.ByteArrayUtf8CharSequence.convertCharSeq;
/**
* Provides methods for marshalling an UpdateRequest to a NamedList which can be serialized in the javabin format and
* vice versa.
*
*
* @see org.apache.solr.common.util.JavaBinCodec
* @since solr 1.4
*/
public class JavaBinUpdateRequestCodec {
private boolean readStringAsCharSeq = false;
public JavaBinUpdateRequestCodec setReadStringAsCharSeq(boolean flag) {
this.readStringAsCharSeq = flag;
return this;
}
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();
/**
* Converts an UpdateRequest to a NamedList which can be serialized to the given OutputStream in the javabin format
*
* @param updateRequest the UpdateRequest to be written out
* @param os the OutputStream to which the request is to be written
*
* @throws IOException in case of an exception during marshalling or writing to the stream
*/
@SuppressWarnings({"unchecked"})
public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOException {
@SuppressWarnings({"rawtypes"})
NamedList nl = new NamedList();
@SuppressWarnings({"rawtypes"})
NamedList params = solrParamsToNamedList(updateRequest.getParams());
if (updateRequest.getCommitWithin() != -1) {
params.add("commitWithin", updateRequest.getCommitWithin());
}
Iterator<SolrInputDocument> docIter = null;
if(updateRequest.getDocIterator() != null){
docIter = updateRequest.getDocIterator();
}
Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
nl.add("params", params);// 0: params
if (updateRequest.getDeleteByIdMap() != null) {
nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
}
nl.add("delByQ", updateRequest.getDeleteQuery());
if (docMap != null) {
nl.add("docsMap", docMap.entrySet().iterator());
} else {
if (updateRequest.getDocuments() != null) {
docIter = updateRequest.getDocuments().iterator();
}
nl.add("docs", docIter);
}
try (JavaBinCodec codec = new JavaBinCodec()) {
codec.marshal(nl, os);
}
}
/**
* Reads a NamedList from the given InputStream, converts it into a SolrInputDocument and passes it to the given
* StreamingUpdateHandler
*
* @param is the InputStream from which to read
* @param handler an instance of StreamingUpdateHandler to which SolrInputDocuments are streamed one by one
*
* @return the UpdateRequest
*
* @throws IOException in case of an exception while reading from the input stream or unmarshalling
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList>> doclist;
List<Entry<SolrInputDocument,Map<Object,Object>>> docMap;
List<String> delById;
Map<String,Map<String,Object>> delByIdMap;
List<String> delByQ;
final NamedList[] namedList = new NamedList[1];
try (JavaBinCodec codec = new StreamingCodec(namedList, updateRequest, handler)) {
codec.unmarshal(is);
}
// NOTE: if the update request contains only delete commands the params
// must be loaded now
if(updateRequest.getParams()==null) {
NamedList params = (NamedList) namedList[0].get("params");
if(params!=null) {
updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams()));
}
}
delById = (List<String>) namedList[0].get("delById");
delByIdMap = (Map<String,Map<String,Object>>) namedList[0].get("delByIdMap");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List) namedList[0].get("docs");
Object docsMapObj = namedList[0].get("docsMap");
if (docsMapObj instanceof Map) {//SOLR-5762
docMap = new ArrayList(((Map)docsMapObj).entrySet());
} else {
docMap = (List<Entry<SolrInputDocument, Map<Object, Object>>>) docsMapObj;
}
// we don't add any docs, because they were already processed
// deletes are handled later, and must be passed back on the UpdateRequest
if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
}
}
if (delByIdMap != null) {
for (Map.Entry<String,Map<String,Object>> entry : delByIdMap.entrySet()) {
Map<String,Object> params = entry.getValue();
if (params != null) {
Long version = (Long) params.get(UpdateRequest.VER);
if (params.containsKey(ShardParams._ROUTE_)) {
updateRequest.deleteById(entry.getKey(), (String) params.get(ShardParams._ROUTE_), version);
} else {
updateRequest.deleteById(entry.getKey(), version);
}
} else {
updateRequest.deleteById(entry.getKey());
}
}
}
if (delByQ != null) {
for (String s : delByQ) {
updateRequest.deleteByQuery(s);
}
}
return updateRequest;
}
@SuppressWarnings({"rawtypes"})
private NamedList solrParamsToNamedList(SolrParams params) {
if (params == null) return new NamedList();
return params.toNamedList();
}
public interface StreamingUpdateHandler {
void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean override);
}
static class MaskCharSequenceSolrInputDoc extends SolrInputDocument {
public MaskCharSequenceSolrInputDoc(Map<String, SolrInputField> fields) {
super(fields);
}
@Override
public Object getFieldValue(String name) {
return convertCharSeq(super.getFieldValue(name));
}
}
class StreamingCodec extends JavaBinCodec {
@SuppressWarnings({"rawtypes"})
private final NamedList[] namedList;
private final UpdateRequest updateRequest;
private final StreamingUpdateHandler handler;
// NOTE: this only works because this is an anonymous inner class
// which will only ever be used on a single stream -- if this class
// is ever refactored, this will not work.
private boolean seenOuterMostDocIterator;
public StreamingCodec(@SuppressWarnings({"rawtypes"})NamedList[] namedList, UpdateRequest updateRequest, StreamingUpdateHandler handler) {
this.namedList = namedList;
this.updateRequest = updateRequest;
this.handler = handler;
seenOuterMostDocIterator = false;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
protected SolrInputDocument createSolrInputDocument(int sz) {
return new MaskCharSequenceSolrInputDoc(new LinkedHashMap(sz));
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public NamedList readNamedList(DataInputInputStream dis) throws IOException {
int sz = readSize(dis);
NamedList nl = new NamedList();
if (namedList[0] == null) {
namedList[0] = nl;
}
for (int i = 0; i < sz; i++) {
String name = (String) readVal(dis);
Object val = readVal(dis);
nl.add(name, val);
}
return nl;
}
@SuppressWarnings({"rawtypes"})
private SolrInputDocument listToSolrInputDocument(List<NamedList> namedList) {
SolrInputDocument doc = new SolrInputDocument();
for (int i = 0; i < namedList.size(); i++) {
NamedList nl = namedList.get(i);
if (i == 0) {
Float boost = (Float) nl.getVal(0);
if (boost != null && boost.floatValue() != 1f) {
String message = "Ignoring document boost: " + boost + " as index-time boosts are not supported anymore";
if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
log.warn(message);
} else {
log.debug(message);
}
}
} else {
Float boost = (Float) nl.getVal(2);
if (boost != null && boost.floatValue() != 1f) {
String message = "Ignoring field boost: " + boost + " as index-time boosts are not supported anymore";
if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
log.warn(message);
} else {
log.debug(message);
}
}
doc.addField((String) nl.getVal(0),
nl.getVal(1));
}
}
return doc;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public List readIterator(DataInputInputStream fis) throws IOException {
// default behavior for reading any regular Iterator in the stream
if (seenOuterMostDocIterator) return super.readIterator(fis);
// special treatment for first outermost Iterator
// (the list of documents)
seenOuterMostDocIterator = true;
return readOuterMostDocIterator(fis);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private List readOuterMostDocIterator(DataInputInputStream fis) throws IOException {
if(namedList[0] == null) namedList[0] = new NamedList();
NamedList params = (NamedList) namedList[0].get("params");
if (params == null) params = new NamedList();
updateRequest.setParams(new ModifiableSolrParams(params.toSolrParams()));
if (handler == null) return super.readIterator(fis);
Integer commitWithin = null;
Boolean overwrite = null;
Object o = null;
super.readStringAsCharSeq = JavaBinUpdateRequestCodec.this.readStringAsCharSeq;
try {
while (true) {
if (o == null) {
o = readVal(fis);
}
if (o == END_OBJ) {
break;
}
SolrInputDocument sdoc = null;
if (o instanceof List) {
sdoc = listToSolrInputDocument((List<NamedList>) o);
} else if (o instanceof NamedList) {
UpdateRequest req = new UpdateRequest();
req.setParams(new ModifiableSolrParams(((NamedList) o).toSolrParams()));
handler.update(null, req, null, null);
} else if (o instanceof Map.Entry) {
sdoc = (SolrInputDocument) ((Entry) o).getKey();
Map p = (Map) ((Entry) o).getValue();
if (p != null) {
commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN);
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else if (o instanceof SolrInputDocument) {
sdoc = (SolrInputDocument) o;
} else if (o instanceof Map) {
sdoc = convertMapToSolrInputDoc((Map) o);
}
// peek at the next object to see if we're at the end
o = readVal(fis);
if (o == END_OBJ) {
// indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication
updateRequest.lastDocInBatch();
}
handler.update(sdoc, updateRequest, commitWithin, overwrite);
}
return Collections.emptyList();
} finally {
super.readStringAsCharSeq = false;
}
}
@SuppressWarnings({"unchecked"})
private SolrInputDocument convertMapToSolrInputDoc(@SuppressWarnings({"rawtypes"})Map m) {
SolrInputDocument result = createSolrInputDocument(m.size());
m.forEach((k, v) -> {
if (CHILDDOC.equals(k.toString())) {
if (v instanceof List) {
@SuppressWarnings({"rawtypes"})
List list = (List) v;
for (Object o : list) {
if (o instanceof Map) {
result.addChildDocument(convertMapToSolrInputDoc((Map) o));
}
}
} else if (v instanceof Map) {
result.addChildDocument(convertMapToSolrInputDoc((Map) v));
}
} else {
result.addField(k.toString(), v);
}
});
return result;
}
}
}