blob: 8e0522a465b54db55b6b1deab63cca4fd17f3a47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.dataimport;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* <p>
* An {@link EntityProcessor} instance which provides support for reading from
* databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
* {@link EntityProcessor} if none is specified explicitly in data-config.xml
* </p>
* <p>
* Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
* for more details.
* </p>
* <p>
* <b>This API is experimental and may change in the future.</b>
*
*
* @since solr 1.3
*/
public class SqlEntityProcessor extends EntityProcessorBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected DataSource<Iterator<Map<String, Object>>> dataSource;
@Override
@SuppressWarnings("unchecked")
public void init(Context context) {
super.init(context);
dataSource = context.getDataSource();
}
protected void initQuery(String q) {
try {
DataImporter.QUERY_COUNT.get().incrementAndGet();
rowIterator = dataSource.getData(q);
this.query = q;
} catch (DataImportHandlerException e) {
throw e;
} catch (Exception e) {
log.error( "The query failed '{}'", q, e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
}
}
@Override
public Map<String, Object> nextRow() {
if (rowIterator == null) {
String q = getQuery();
initQuery(context.replaceTokens(q));
}
return getNext();
}
@Override
public Map<String, Object> nextModifiedRowKey() {
if (rowIterator == null) {
String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
if (deltaQuery == null)
return null;
initQuery(context.replaceTokens(deltaQuery));
}
return getNext();
}
@Override
public Map<String, Object> nextDeletedRowKey() {
if (rowIterator == null) {
String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
if (deletedPkQuery == null)
return null;
initQuery(context.replaceTokens(deletedPkQuery));
}
return getNext();
}
@Override
public Map<String, Object> nextModifiedParentRowKey() {
if (rowIterator == null) {
String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
if (parentDeltaQuery == null)
return null;
if (log.isInfoEnabled()) {
log.info("Running parentDeltaQuery for Entity: {}"
, context.getEntityAttribute("name"));
}
initQuery(context.replaceTokens(parentDeltaQuery));
}
return getNext();
}
public String getQuery() {
String queryString = context.getEntityAttribute(QUERY);
if (Context.FULL_DUMP.equals(context.currentProcess())) {
return queryString;
}
if (Context.DELTA_DUMP.equals(context.currentProcess())) {
String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
if(deltaImportQuery != null) return deltaImportQuery;
}
log.warn("'deltaImportQuery' attribute is not specified for entity : {}", entityName);
return getDeltaImportQuery(queryString);
}
public String getDeltaImportQuery(String queryString) {
StringBuilder sb = new StringBuilder(queryString);
if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
sb.append(" and ");
} else {
sb.append(" where ");
}
boolean first = true;
String[] primaryKeys = context.getEntityAttribute("pk").split(",");
for (String primaryKey : primaryKeys) {
if (!first) {
sb.append(" and ");
}
first = false;
Object val = context.resolve("dataimporter.delta." + primaryKey);
if (val == null) {
Matcher m = DOT_PATTERN.matcher(primaryKey);
if (m.find()) {
val = context.resolve("dataimporter.delta." + m.group(1));
}
}
sb.append(primaryKey).append(" = ");
if (val instanceof Number) {
sb.append(val.toString());
} else {
sb.append("'").append(val.toString()).append("'");
}
}
return sb.toString();
}
private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
"^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);
public static final String QUERY = "query";
public static final String DELTA_QUERY = "deltaQuery";
public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";
public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";
public static final String DEL_PK_QUERY = "deletedPkQuery";
public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");
}