blob: df6f9adf124d96f4b4bfcac7bb4ea5de022ef119 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.solr.handler.dataimport.config.DIHConfiguration;
import org.apache.solr.handler.dataimport.config.Entity;
import org.apache.solr.handler.dataimport.config.EntityField;
import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p> {@link DocBuilder} is responsible for creating Solr documents out of the given configuration. It also maintains
* statistics information. It depends on the {@link EntityProcessor} implementations to fetch data. </p>
* <p>
* <b>This API is experimental and subject to change</b>
*
* @since solr 1.3
*/
public class DocBuilder {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();
private static final Date EPOCH = new Date(0);
public static final String DELETE_DOC_BY_ID = "$deleteDocById";
public static final String DELETE_DOC_BY_QUERY = "$deleteDocByQuery";
public static final String DOC_BOOST = "$docBoost";
public static final String SKIP_DOC = "$skipDoc";
public static final String SKIP_ROW = "$skipRow";
DataImporter dataImporter;
private DIHConfiguration config;
private EntityProcessorWrapper currentEntityProcessorWrapper;
@SuppressWarnings({"unchecked", "rawtypes"})
private Map statusMessages = Collections.synchronizedMap(new LinkedHashMap());
public Statistics importStatistics = new Statistics();
DIHWriter writer;
boolean verboseDebug = false;
Map<String, Object> session = new HashMap<>();
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<>();
private Map<String, Object> persistedProperties;
private DIHProperties propWriter;
private DebugLogger debugLogger;
private final RequestInfo reqParams;
public DocBuilder(DataImporter dataImporter, DIHWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
INSTANCE.set(this);
this.dataImporter = dataImporter;
this.reqParams = reqParams;
this.propWriter = propWriter;
DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose;
persistedProperties = propWriter.readIndexerProperties();
writer = solrWriter;
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
if (writer != null) {
writer.init(ctx);
}
}
DebugLogger getDebugLogger(){
if (debugLogger == null) {
debugLogger = new DebugLogger();
}
return debugLogger;
}
private VariableResolver getVariableResolver() {
try {
VariableResolver resolver = null;
String epoch = propWriter.convertDateToString(EPOCH);
if(dataImporter != null && dataImporter.getCore() != null
&& dataImporter.getCore().getCoreDescriptor().getSubstitutableProperties() != null){
resolver = new VariableResolver(dataImporter.getCore().getCoreDescriptor().getSubstitutableProperties());
} else {
resolver = new VariableResolver();
}
resolver.setEvaluators(dataImporter.getEvaluators());
Map<String, Object> indexerNamespace = new HashMap<>();
if (persistedProperties.get(LAST_INDEX_TIME) != null) {
indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME));
} else {
// set epoch
indexerNamespace.put(LAST_INDEX_TIME, epoch);
}
indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
indexerNamespace.put("request", new HashMap<>(reqParams.getRawParams()));
indexerNamespace.put("handlerName", dataImporter.getHandlerName());
for (Entity entity : dataImporter.getConfig().getEntities()) {
Map<String, Object> entityNamespace = new HashMap<>();
String key = SolrWriter.LAST_INDEX_KEY;
Object lastIndex = persistedProperties.get(entity.getName() + "." + key);
if (lastIndex != null) {
entityNamespace.put(SolrWriter.LAST_INDEX_KEY, lastIndex);
} else {
entityNamespace.put(SolrWriter.LAST_INDEX_KEY, epoch);
}
indexerNamespace.put(entity.getName(), entityNamespace);
}
resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace);
resolver.addNamespace(ConfigNameConstants.IMPORTER_NS, indexerNamespace);
return resolver;
} catch (Exception e) {
wrapAndThrow(SEVERE, e);
// unreachable statement
return null;
}
}
private void invokeEventListener(String className) {
invokeEventListener(className, null);
}
private void invokeEventListener(String className, Exception lastException) {
try {
@SuppressWarnings({"unchecked"})
EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).getConstructor().newInstance();
notifyListener(listener, lastException);
} catch (Exception e) {
wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
}
}
private void notifyListener(EventListener listener, Exception lastException) {
String currentProcess;
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
currentProcess = Context.DELTA_DUMP;
} else {
currentProcess = Context.FULL_DUMP;
}
ContextImpl ctx = new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this);
ctx.setLastException(lastException);
listener.onEvent(ctx);
}
@SuppressWarnings("unchecked")
public void execute() {
List<EntityProcessorWrapper> epwList = null;
try {
dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
config = dataImporter.getConfig();
final AtomicLong startTime = new AtomicLong(System.nanoTime());
statusMessages.put(TIME_ELAPSED, new Object() {
@Override
public String toString() {
return getTimeElapsedSince(startTime.get());
}
});
statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
importStatistics.queryCount);
statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
importStatistics.rowsCount);
statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
importStatistics.docCount);
statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
importStatistics.skipDocCount);
List<String> entities = reqParams.getEntitiesToRun();
// Trigger onImportStart
if (config.getOnImportStart() != null) {
invokeEventListener(config.getOnImportStart());
}
AtomicBoolean fullCleanDone = new AtomicBoolean(false);
//we must not do a delete of *:* multiple times if there are multiple root entities to be run
Map<String,Object> lastIndexTimeProps = new HashMap<>();
lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime());
epwList = new ArrayList<>(config.getEntities().size());
for (Entity e : config.getEntities()) {
epwList.add(getEntityProcessorWrapper(e));
}
for (EntityProcessorWrapper epw : epwList) {
if (entities != null && !entities.contains(epw.getEntity().getName()))
continue;
lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp());
currentEntityProcessorWrapper = epw;
String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
cleanByQuery(delQuery, fullCleanDone);
doDelta();
delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
if (delQuery != null) {
fullCleanDone.set(false);
cleanByQuery(delQuery, fullCleanDone);
}
} else {
cleanByQuery(delQuery, fullCleanDone);
doFullDump();
delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
if (delQuery != null) {
fullCleanDone.set(false);
cleanByQuery(delQuery, fullCleanDone);
}
}
}
if (stop.get()) {
// Dont commit if aborted using command=abort
statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
handleError("Aborted", null);
} else {
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
if (!reqParams.isClean()) {
if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
finish(lastIndexTimeProps);
}
} else {
// Finished operation normally, commit now
finish(lastIndexTimeProps);
}
if (config.getOnImportEnd() != null) {
invokeEventListener(config.getOnImportEnd());
}
}
statusMessages.remove(TIME_ELAPSED);
statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
if(importStatistics.failedDocCount.get() > 0)
statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
statusMessages.put("Time taken", getTimeElapsedSince(startTime.get()));
if (log.isInfoEnabled()) {
log.info("Time taken = {}", getTimeElapsedSince(startTime.get()));
}
} catch(Exception e)
{
throw new RuntimeException(e);
} finally {
// Cannot use IOUtils.closeQuietly since DIH relies on exceptions bubbling out of writer.close() to indicate
// success/failure of the run.
RuntimeException raisedDuringClose = null;
try {
if (writer != null) {
writer.close();
}
} catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn("Exception encountered while closing DIHWriter " + writer + "; temporarily suppressing to ensure other DocBuilder elements are closed", e); // logOk
}
raisedDuringClose = e;
}
if (epwList != null) {
closeEntityProcessorWrappers(epwList);
}
if(reqParams.isDebug()) {
reqParams.getDebugInfo().debugVerboseOutput = getDebugLogger().output;
}
if (raisedDuringClose != null) {
throw raisedDuringClose;
}
}
}
private void closeEntityProcessorWrappers(List<EntityProcessorWrapper> epwList) {
for(EntityProcessorWrapper epw : epwList) {
IOUtils.closeQuietly(epw);
if(epw.getDatasource() != null) {
IOUtils.closeQuietly(epw.getDatasource());
}
closeEntityProcessorWrappers(epw.getChildren());
}
}
@SuppressWarnings("unchecked")
private void finish(Map<String,Object> lastIndexTimeProps) {
log.info("Import completed successfully");
statusMessages.put("", "Indexing completed. Added/Updated: "
+ importStatistics.docCount + " documents. Deleted "
+ importStatistics.deletedDocCount + " documents.");
if(reqParams.isCommit()) {
writer.commit(reqParams.isOptimize());
addStatusMessage("Committed");
if (reqParams.isOptimize())
addStatusMessage("Optimized");
}
try {
propWriter.persist(lastIndexTimeProps);
} catch (Exception e) {
log.error("Could not write property file", e);
statusMessages.put("error", "Could not write property file. Delta imports will not work. " +
"Make sure your conf directory is writable");
}
}
@SuppressWarnings({"unchecked"})
void handleError(String message, Exception e) {
if (!dataImporter.getCore().getCoreContainer().isZooKeeperAware()) {
writer.rollback();
}
statusMessages.put(message, "Indexing error");
addStatusMessage(message);
if ((config != null) && (config.getOnError() != null)) {
invokeEventListener(config.getOnError(), e);
}
}
private void doFullDump() {
addStatusMessage("Full Dump Started");
buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
}
@SuppressWarnings("unchecked")
private void doDelta() {
addStatusMessage("Delta Dump started");
VariableResolver resolver = getVariableResolver();
if (config.getDeleteQuery() != null) {
writer.deleteByQuery(config.getDeleteQuery());
}
addStatusMessage("Identifying Delta");
log.info("Starting delta collection.");
Set<Map<String, Object>> deletedKeys = new HashSet<>();
Set<Map<String, Object>> allPks = collectDelta(currentEntityProcessorWrapper, resolver, deletedKeys);
if (stop.get())
return;
addStatusMessage("Deltas Obtained");
addStatusMessage("Building documents");
if (!deletedKeys.isEmpty()) {
allPks.removeAll(deletedKeys);
deleteAll(deletedKeys);
// Make sure that documents are not re-created
}
deletedKeys = null;
writer.setDeltaKeys(allPks);
statusMessages.put("Total Changed Documents", allPks.size());
VariableResolver vri = getVariableResolver();
Iterator<Map<String, Object>> pkIter = allPks.iterator();
while (pkIter.hasNext()) {
Map<String, Object> map = pkIter.next();
vri.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT + ".delta", map);
buildDocument(vri, null, map, currentEntityProcessorWrapper, true, null);
pkIter.remove();
// check for abort
if (stop.get())
break;
}
if (!stop.get()) {
log.info("Delta Import completed successfully");
}
}
private void deleteAll(Set<Map<String, Object>> deletedKeys) {
log.info("Deleting stale documents ");
Iterator<Map<String, Object>> iter = deletedKeys.iterator();
while (iter.hasNext()) {
Map<String, Object> map = iter.next();
String keyName = currentEntityProcessorWrapper.getEntity().isDocRoot() ? currentEntityProcessorWrapper.getEntity().getPk() : currentEntityProcessorWrapper.getEntity().getSchemaPk();
Object key = map.get(keyName);
if(key == null) {
keyName = findMatchingPkColumn(keyName, map);
key = map.get(keyName);
}
if(key == null) {
log.warn("no key was available for deleted pk query. keyName = {}", keyName);
continue;
}
writer.deleteDoc(key);
importStatistics.deletedDocCount.incrementAndGet();
iter.remove();
}
}
@SuppressWarnings("unchecked")
public void addStatusMessage(String msg) {
statusMessages.put(msg, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
}
private void resetEntity(EntityProcessorWrapper epw) {
epw.setInitialized(false);
for (EntityProcessorWrapper child : epw.getChildren()) {
resetEntity(child);
}
}
private void buildDocument(VariableResolver vr, DocWrapper doc,
Map<String,Object> pk, EntityProcessorWrapper epw, boolean isRoot,
ContextImpl parentCtx) {
List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<>();
try {
buildDocument(vr, doc, pk, epw, isRoot, parentCtx, entitiesToDestroy);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
entityWrapper.destroy();
}
resetEntity(epw);
}
}
@SuppressWarnings("unchecked")
private void buildDocument(VariableResolver vr, DocWrapper doc,
Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
ContextImpl ctx = new ContextImpl(epw, vr, null,
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
epw.init(ctx);
if (!epw.isInitialized()) {
entitiesToDestroy.add(epw);
epw.setInitialized(true);
}
if (reqParams.getStart() > 0) {
getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
}
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.START_ENTITY, epw.getEntity().getName(), null);
}
int seenDocCount = 0;
try {
while (true) {
if (stop.get())
return;
if(importStatistics.docCount.get() > (reqParams.getStart() + reqParams.getRows())) break;
try {
seenDocCount++;
if (seenDocCount > reqParams.getStart()) {
getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
}
if (verboseDebug && epw.getEntity().isDocRoot()) {
getDebugLogger().log(DIHLogLevels.START_DOC, epw.getEntity().getName(), null);
}
if (doc == null && epw.getEntity().isDocRoot()) {
doc = new DocWrapper();
ctx.setDoc(doc);
Entity e = epw.getEntity();
while (e.getParentEntity() != null) {
addFields(e.getParentEntity(), doc, (Map<String, Object>) vr
.resolve(e.getParentEntity().getName()), vr);
e = e.getParentEntity();
}
}
Map<String, Object> arow = epw.nextRow();
if (arow == null) {
break;
}
// Support for start parameter in debug mode
if (epw.getEntity().isDocRoot()) {
if (seenDocCount <= reqParams.getStart())
continue;
if (seenDocCount > reqParams.getStart() + reqParams.getRows()) {
log.info("Indexing stopped at docCount = {}", importStatistics.docCount);
break;
}
}
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_OUT, epw.getEntity().getName(), arow);
}
importStatistics.rowsCount.incrementAndGet();
DocWrapper childDoc = null;
if (doc != null) {
if (epw.getEntity().isChild()) {
childDoc = new DocWrapper();
handleSpecialCommands(arow, childDoc);
addFields(epw.getEntity(), childDoc, arow, vr);
doc.addChildDocument(childDoc);
} else {
handleSpecialCommands(arow, doc);
vr.addNamespace(epw.getEntity().getName(), arow);
addFields(epw.getEntity(), doc, arow, vr);
vr.removeNamespace(epw.getEntity().getName());
}
}
if (epw.getEntity().getChildren() != null) {
vr.addNamespace(epw.getEntity().getName(), arow);
for (EntityProcessorWrapper child : epw.getChildren()) {
if (childDoc != null) {
buildDocument(vr, childDoc,
child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
} else {
buildDocument(vr, doc,
child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
}
}
vr.removeNamespace(epw.getEntity().getName());
}
if (epw.getEntity().isDocRoot()) {
if (stop.get())
return;
if (!doc.isEmpty()) {
boolean result = writer.upload(doc);
if(reqParams.isDebug()) {
reqParams.getDebugInfo().debugDocuments.add(doc);
}
doc = null;
if (result){
importStatistics.docCount.incrementAndGet();
} else {
importStatistics.failedDocCount.incrementAndGet();
}
}
}
} catch (DataImportHandlerException e) {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), e);
}
if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
continue;
}
if (isRoot) {
if (e.getErrCode() == DataImportHandlerException.SKIP) {
importStatistics.skipDocCount.getAndIncrement();
doc = null;
} else {
SolrException.log(log, "Exception while processing: "
+ epw.getEntity().getName() + " document : " + doc, e);
}
if (e.getErrCode() == DataImportHandlerException.SEVERE)
throw e;
} else
throw e;
} catch (Exception t) {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), t);
}
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
} finally {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ROW_END, epw.getEntity().getName(), null);
if (epw.getEntity().isDocRoot())
getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
}
}
}
} finally {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
}
}
}
static class DocWrapper extends SolrInputDocument {
//final SolrInputDocument solrDocument = new SolrInputDocument();
Map<String ,Object> session;
public void setSessionAttribute(String key, Object val){
if(session == null) session = new HashMap<>();
session.put(key, val);
}
public Object getSessionAttribute(String key) {
return session == null ? null : session.get(key);
}
}
private void handleSpecialCommands(Map<String, Object> arow, DocWrapper doc) {
Object value = arow.get(DELETE_DOC_BY_ID);
if (value != null) {
if (value instanceof Collection) {
@SuppressWarnings({"rawtypes"})
Collection collection = (Collection) value;
for (Object o : collection) {
writer.deleteDoc(o.toString());
importStatistics.deletedDocCount.incrementAndGet();
}
} else {
writer.deleteDoc(value);
importStatistics.deletedDocCount.incrementAndGet();
}
}
value = arow.get(DELETE_DOC_BY_QUERY);
if (value != null) {
if (value instanceof Collection) {
@SuppressWarnings({"rawtypes"})
Collection collection = (Collection) value;
for (Object o : collection) {
writer.deleteByQuery(o.toString());
importStatistics.deletedDocCount.incrementAndGet();
}
} else {
writer.deleteByQuery(value.toString());
importStatistics.deletedDocCount.incrementAndGet();
}
}
value = arow.get(DOC_BOOST);
if (value != null) {
String message = "Ignoring document boost: " + value + " as index-time boosts are not supported anymore";
if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
log.warn(message);
} else {
log.debug(message);
}
}
value = arow.get(SKIP_DOC);
if (value != null) {
if (Boolean.parseBoolean(value.toString())) {
throw new DataImportHandlerException(DataImportHandlerException.SKIP,
"Document skipped :" + arow);
}
}
value = arow.get(SKIP_ROW);
if (value != null) {
if (Boolean.parseBoolean(value.toString())) {
throw new DataImportHandlerException(DataImportHandlerException.SKIP_ROW);
}
}
}
@SuppressWarnings("unchecked")
private void addFields(Entity entity, DocWrapper doc,
Map<String, Object> arow, VariableResolver vr) {
for (Map.Entry<String, Object> entry : arow.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value == null) continue;
if (key.startsWith("$")) continue;
Set<EntityField> field = entity.getColNameVsField().get(key);
IndexSchema schema = null == reqParams.getRequest() ? null : reqParams.getRequest().getSchema();
if (field == null && schema != null) {
// This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field)
SchemaField sf = schema.getFieldOrNull(key);
if (sf == null) {
sf = config.getSchemaField(key);
}
if (sf != null) {
addFieldToDoc(entry.getValue(), sf.getName(), sf.multiValued(), doc);
}
//else do nothing. if we add it it may fail
} else {
if (field != null) {
for (EntityField f : field) {
String name = f.getName();
boolean multiValued = f.isMultiValued();
boolean toWrite = f.isToWrite();
if(f.isDynamicName()){
name = vr.replaceTokens(name);
SchemaField schemaField = config.getSchemaField(name);
if(schemaField == null) {
toWrite = false;
} else {
multiValued = schemaField.multiValued();
toWrite = true;
}
}
if (toWrite) {
addFieldToDoc(entry.getValue(), name, multiValued, doc);
}
}
}
}
}
}
private void addFieldToDoc(Object value, String name, boolean multiValued, DocWrapper doc) {
if (value instanceof Collection) {
@SuppressWarnings({"rawtypes"})
Collection collection = (Collection) value;
if (multiValued) {
for (Object o : collection) {
if (o != null)
doc.addField(name, o);
}
} else {
if (doc.getField(name) == null)
for (Object o : collection) {
if (o != null) {
doc.addField(name, o);
break;
}
}
}
} else if (multiValued) {
if (value != null) {
doc.addField(name, value);
}
} else {
if (doc.getField(name) == null && value != null)
doc.addField(name, value);
}
}
@SuppressWarnings({"unchecked"})
public EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
EntityProcessor entityProcessor = null;
if (entity.getProcessorName() == null) {
entityProcessor = new SqlEntityProcessor();
} else {
try {
entityProcessor = (EntityProcessor) loadClass(entity.getProcessorName(), dataImporter.getCore())
.newInstance();
} catch (Exception e) {
wrapAndThrow (SEVERE,e,
"Unable to load EntityProcessor implementation for entity:" + entity.getName());
}
}
EntityProcessorWrapper epw = new EntityProcessorWrapper(entityProcessor, entity, this);
for(Entity e1 : entity.getChildren()) {
epw.getChildren().add(getEntityProcessorWrapper(e1));
}
return epw;
}
private String findMatchingPkColumn(String pk, Map<String, Object> row) {
if (row.containsKey(pk)) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"deltaQuery returned a row with null for primary key %s", pk));
}
String resolvedPk = null;
for (String columnName : row.keySet()) {
if (columnName.endsWith("." + pk) || pk.endsWith("." + columnName)) {
if (resolvedPk != null)
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"deltaQuery has more than one column (%s and %s) that might resolve to declared primary key pk='%s'",
resolvedPk, columnName, pk));
resolvedPk = columnName;
}
}
if (resolvedPk == null) {
throw new IllegalArgumentException(
String
.format(
Locale.ROOT,
"deltaQuery has no column to resolve to declared primary key pk='%s'",
pk));
}
if (log.isInfoEnabled()) {
log.info(String.format(Locale.ROOT,
"Resolving deltaQuery column '%s' to match entity's declared pk '%s'",
resolvedPk, pk));
}
return resolvedPk;
}
/**
* <p> Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last
* indexed time. </p> <p> Note: In our definition, unique key of Solr document is the primary key of the top level
* entity (unless skipped using docRoot=false) in the Solr document in data-config.xml </p>
*
* @return an iterator to the list of keys for which Solr documents should be updated.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public Set<Map<String, Object>> collectDelta(EntityProcessorWrapper epw, VariableResolver resolver,
Set<Map<String, Object>> deletedRows) {
//someone called abort
if (stop.get())
return new HashSet();
ContextImpl context1 = new ContextImpl(epw, resolver, null, Context.FIND_DELTA, session, null, this);
epw.init(context1);
Set<Map<String, Object>> myModifiedPks = new HashSet<>();
for (EntityProcessorWrapper childEpw : epw.getChildren()) {
//this ensures that we start from the leaf nodes
myModifiedPks.addAll(collectDelta(childEpw, resolver, deletedRows));
//someone called abort
if (stop.get())
return new HashSet();
}
// identifying the modified rows for this entity
Map<String, Map<String, Object>> deltaSet = new HashMap<>();
if (log.isInfoEnabled()) {
log.info("Running ModifiedRowKey() for Entity: {}", epw.getEntity().getName());
}
//get the modified rows in this entity
String pk = epw.getEntity().getPk();
while (true) {
Map<String, Object> row = epw.nextModifiedRowKey();
if (row == null)
break;
Object pkValue = row.get(pk);
if (pkValue == null) {
pk = findMatchingPkColumn(pk, row);
pkValue = row.get(pk);
}
deltaSet.put(pkValue.toString(), row);
importStatistics.rowsCount.incrementAndGet();
// check for abort
if (stop.get())
return new HashSet();
}
//get the deleted rows for this entity
Set<Map<String, Object>> deletedSet = new HashSet<>();
while (true) {
Map<String, Object> row = epw.nextDeletedRowKey();
if (row == null)
break;
deletedSet.add(row);
Object pkValue = row.get(pk);
if (pkValue == null) {
pk = findMatchingPkColumn(pk, row);
pkValue = row.get(pk);
}
// Remove deleted rows from the delta rows
String deletedRowPk = pkValue.toString();
if (deltaSet.containsKey(deletedRowPk)) {
deltaSet.remove(deletedRowPk);
}
importStatistics.rowsCount.incrementAndGet();
// check for abort
if (stop.get())
return new HashSet();
}
if (log.isInfoEnabled()) {
log.info("Completed ModifiedRowKey for Entity: {} rows obtained: {}", epw.getEntity().getName(), deltaSet.size());
log.info("Completed DeletedRowKey for Entity: {} rows obtained : {}", epw.getEntity().getName(), deletedSet.size()); // logOk
}
myModifiedPks.addAll(deltaSet.values());
Set<Map<String, Object>> parentKeyList = new HashSet<>();
//all that we have captured is useless (in a sub-entity) if no rows in the parent is modified because of these
//propogate up the changes in the chain
if (epw.getEntity().getParentEntity() != null) {
// identifying deleted rows with deltas
for (Map<String, Object> row : myModifiedPks) {
resolver.addNamespace(epw.getEntity().getName(), row);
getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
// check for abort
if (stop.get())
return new HashSet();
}
// running the same for deletedrows
for (Map<String, Object> row : deletedSet) {
resolver.addNamespace(epw.getEntity().getName(), row);
getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
// check for abort
if (stop.get())
return new HashSet();
}
}
if (log.isInfoEnabled()) {
log.info("Completed parentDeltaQuery for Entity: {}", epw.getEntity().getName());
}
if (epw.getEntity().isDocRoot())
deletedRows.addAll(deletedSet);
// Do not use entity.isDocRoot here because one of descendant entities may set rootEntity="true"
return epw.getEntity().getParentEntity() == null ?
myModifiedPks : new HashSet<>(parentKeyList);
}
private void getModifiedParentRows(VariableResolver resolver,
String entity, EntityProcessor entityProcessor,
Set<Map<String, Object>> parentKeyList) {
try {
while (true) {
Map<String, Object> parentRow = entityProcessor
.nextModifiedParentRowKey();
if (parentRow == null)
break;
parentKeyList.add(parentRow);
importStatistics.rowsCount.incrementAndGet();
// check for abort
if (stop.get())
return;
}
} finally {
resolver.removeNamespace(entity);
}
}
public void abort() {
stop.set(true);
}
private AtomicBoolean stop = new AtomicBoolean(false);
public static final String TIME_ELAPSED = "Time Elapsed";
static String getTimeElapsedSince(long l) {
l = TimeUnit.MILLISECONDS.convert(System.nanoTime() - l, TimeUnit.NANOSECONDS);
return (l / (60000 * 60)) + ":" + (l / 60000) % 60 + ":" + (l / 1000)
% 60 + "." + l % 1000;
}
public RequestInfo getReqParams() {
return reqParams;
}
@SuppressWarnings({"unchecked", "rawtypes"})
static Class loadClass(String name, SolrCore core) throws ClassNotFoundException {
try {
return core != null ?
core.getResourceLoader().findClass(name, Object.class) :
Class.forName(name);
} catch (Exception e) {
try {
String n = DocBuilder.class.getPackage().getName() + "." + name;
return core != null ?
core.getResourceLoader().findClass(n, Object.class) :
Class.forName(n);
} catch (Exception e1) {
throw new ClassNotFoundException("Unable to load " + name + " or " + DocBuilder.class.getPackage().getName() + "." + name, e);
}
}
}
public static class Statistics {
public AtomicLong docCount = new AtomicLong();
public AtomicLong deletedDocCount = new AtomicLong();
public AtomicLong failedDocCount = new AtomicLong();
public AtomicLong rowsCount = new AtomicLong();
public AtomicLong queryCount = new AtomicLong();
public AtomicLong skipDocCount = new AtomicLong();
public Statistics add(Statistics stats) {
this.docCount.addAndGet(stats.docCount.get());
this.deletedDocCount.addAndGet(stats.deletedDocCount.get());
this.rowsCount.addAndGet(stats.rowsCount.get());
this.queryCount.addAndGet(stats.queryCount.get());
return this;
}
public Map<String, Object> getStatsSnapshot() {
Map<String, Object> result = new HashMap<>();
result.put("docCount", docCount.get());
result.put("deletedDocCount", deletedDocCount.get());
result.put("rowCount", rowsCount.get());
result.put("queryCount", rowsCount.get());
result.put("skipDocCount", skipDocCount.get());
return result;
}
}
private void cleanByQuery(String delQuery, AtomicBoolean completeCleanDone) {
delQuery = getVariableResolver().replaceTokens(delQuery);
if (reqParams.isClean()) {
if (delQuery == null && !completeCleanDone.get()) {
writer.doDeleteAll();
completeCleanDone.set(true);
} else if (delQuery != null) {
writer.deleteByQuery(delQuery);
}
}
}
public static final String LAST_INDEX_TIME = "last_index_time";
public static final String INDEX_START_TIME = "index_start_time";
}