Upgrade lucene in workflow
diff --git a/workflow/pom.xml b/workflow/pom.xml
index 7cd03eb..0abb354 100644
--- a/workflow/pom.xml
+++ b/workflow/pom.xml
@@ -118,6 +118,10 @@
<artifactId>lucene-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-analyzers-common</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.oodt</groupId>
<artifactId>cas-cli</artifactId>
</dependency>
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
index b6c8447..c3bf291 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java
@@ -18,20 +18,16 @@
package org.apache.oodt.cas.workflow.instrepo;
-//OODT imports
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.*;
+import org.apache.lucene.search.*;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.BytesRef;
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleStage;
import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
@@ -65,6 +61,9 @@
*/
public class LuceneWorkflowInstanceRepository extends
AbstractPaginatibleInstanceRepository {
+ Directory indexDir = null;
+ private DirectoryReader reader;
+ /* the path to the index directory for this catalog */
public static final int MERGE_FACTOR = 20;
/* path to lucene index directory to store wInst info */
@@ -76,6 +75,7 @@
/* our workflow inst id generator */
private static UUIDGenerator generator = UUIDGenerator.getInstance();
+ private int mergeFactor = 20;
/**
*
@@ -83,6 +83,11 @@
public LuceneWorkflowInstanceRepository(String idxPath, int pageSize) {
this.idxFilePath = idxPath;
this.pageSize = pageSize;
+ try {
+ indexDir = FSDirectory.open(new File( idxFilePath ).toPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
/*
@@ -93,16 +98,20 @@
public int getNumWorkflowInstances() throws InstanceRepositoryException {
IndexSearcher searcher = null;
int numInsts = -1;
-
try {
- searcher = new IndexSearcher(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ searcher = new IndexSearcher(reader);
Term instIdTerm = new Term("myfield", "myvalue");
org.apache.lucene.search.Query query = new TermQuery(instIdTerm);
Sort sort = new Sort(new SortField("workflow_inst_startdatetime",
- SortField.STRING, true));
- Hits hits = searcher.search(query, sort);
+ SortField.Type.STRING, true));
+ TopDocs topDocs = searcher.search(query, 1, sort);
- numInsts = hits.length();
+ numInsts = topDocs.totalHits;
} catch (IOException e) {
LOG.log(Level.WARNING,
@@ -112,7 +121,7 @@
} finally {
if (searcher != null) {
try {
- searcher.close();
+ //TODO Shutdown searcher
} catch (Exception ignore) {
}
}
@@ -130,16 +139,20 @@
throws InstanceRepositoryException {
IndexSearcher searcher = null;
int numInsts = -1;
-
try {
- searcher = new IndexSearcher(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ searcher = new IndexSearcher(reader);
Term instIdTerm = new Term("workflow_inst_status", status);
org.apache.lucene.search.Query query = new TermQuery(instIdTerm);
Sort sort = new Sort(new SortField("workflow_inst_startdatetime",
- SortField.STRING, true));
- Hits hits = searcher.search(query, sort);
+ SortField.Type.STRING, true));
+ TopDocs topDocs = searcher.search(query, 1, sort);
- numInsts = hits.length();
+ numInsts = topDocs.totalHits;
} catch (IOException e) {
LOG.log(Level.WARNING,
@@ -149,7 +162,7 @@
} finally {
if (searcher != null) {
try {
- searcher.close();
+ //TODO Shutdown searcher
} catch (Exception ignore) {
}
}
@@ -204,21 +217,27 @@
throws InstanceRepositoryException {
IndexSearcher searcher = null;
WorkflowInstance wInst = null;
-
try {
- searcher = new IndexSearcher(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ searcher = new IndexSearcher(reader);
Term instIdTerm = new Term("workflow_inst_id", workflowInstId);
org.apache.lucene.search.Query query = new TermQuery(instIdTerm);
- Hits hits = searcher.search(query);
+ TopDocs check = searcher.search(query, 1);
- if (hits.length() != 1) {
+ if (check.totalHits != 1) {
LOG.log(Level.WARNING, "The workflow instance: ["
+ workflowInstId + "] is not being "
+ "managed by this " + "workflow engine, or "
- + "is not unique in the catalog: num hits: ["+hits.length()+"]");
+ + "is not unique in the catalog: num hits: ["+check.totalHits+"]");
return null;
} else {
- Document instDoc = hits.doc(0);
+ TopDocs topDocs = searcher.search(query, check.totalHits);
+ ScoreDoc[] hits = topDocs.scoreDocs;
+ Document instDoc = searcher.doc(hits[0].doc);
wInst = toWorkflowInstance(instDoc);
}
@@ -230,7 +249,7 @@
} finally {
if (searcher != null) {
try {
- searcher.close();
+ //TODO Shutdown searcher
} catch (Exception ignore) {
}
}
@@ -247,22 +266,29 @@
public List getWorkflowInstances() throws InstanceRepositoryException {
IndexSearcher searcher = null;
List wInsts = null;
-
try {
- searcher = new IndexSearcher(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ searcher = new IndexSearcher(reader);
Term instIdTerm = new Term("myfield", "myvalue");
org.apache.lucene.search.Query query = new TermQuery(instIdTerm);
Sort sort = new Sort(new SortField("workflow_inst_startdatetime",
- SortField.STRING, true));
- Hits hits = searcher.search(query, sort);
+ SortField.Type.STRING, true));
+ TopDocs check = searcher.search(query, 1, sort);
+ if(check.totalHits>0) {
+ TopDocs topDocs = searcher.search(query, check.totalHits, sort);
+ ScoreDoc[] hits = topDocs.scoreDocs;
+ if (topDocs.totalHits > 0) {
+ wInsts = new Vector(hits.length);
- if (hits.length() > 0) {
- wInsts = new Vector(hits.length());
-
- for (int i = 0; i < hits.length(); i++) {
- Document doc = hits.doc(i);
- WorkflowInstance wInst = toWorkflowInstance(doc);
- wInsts.add(wInst);
+ for (ScoreDoc hit : hits) {
+ Document doc = searcher.doc(hit.doc);
+ WorkflowInstance wInst = toWorkflowInstance(doc);
+ wInsts.add(wInst);
+ }
}
}
@@ -274,7 +300,7 @@
} finally {
if (searcher != null) {
try {
- searcher.close();
+ //TODO Shutdown searcher
} catch (Exception ignore) {
}
}
@@ -292,22 +318,29 @@
throws InstanceRepositoryException {
IndexSearcher searcher = null;
List wInsts = null;
-
try {
- searcher = new IndexSearcher(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ searcher = new IndexSearcher(reader);
Term instIdTerm = new Term("workflow_inst_status", status);
org.apache.lucene.search.Query query = new TermQuery(instIdTerm);
Sort sort = new Sort(new SortField("workflow_inst_startdatetime",
- SortField.STRING, true));
- Hits hits = searcher.search(query, sort);
+ SortField.Type.STRING, true));
+ TopDocs check = searcher.search(query, 1, sort);
+ if(check.totalHits>0) {
+ TopDocs topDocs = searcher.search(query, check.totalHits, sort);
+ ScoreDoc[] hits = topDocs.scoreDocs;
+ if (hits.length > 0) {
+ wInsts = new Vector(hits.length);
- if (hits.length() > 0) {
- wInsts = new Vector(hits.length());
-
- for (int i = 0; i < hits.length(); i++) {
- Document doc = hits.doc(i);
- WorkflowInstance wInst = toWorkflowInstance(doc);
- wInsts.add(wInst);
+ for (ScoreDoc hit : hits) {
+ Document doc = searcher.doc(hit.doc);
+ WorkflowInstance wInst = toWorkflowInstance(doc);
+ wInsts.add(wInst);
+ }
}
}
@@ -319,7 +352,7 @@
} finally {
if (searcher != null) {
try {
- searcher.close();
+ //TODO Shutdown searcher
} catch (Exception ignore) {
}
}
@@ -338,12 +371,16 @@
throws InstanceRepositoryException {
List instIds = null;
IndexSearcher searcher = null;
-
try {
- searcher = new IndexSearcher(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ searcher = new IndexSearcher(reader);
// construct a Boolean query here
- BooleanQuery booleanQuery = new BooleanQuery();
+ BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
Term instIdTerm = new Term("myfield", "myvalue");
if (status != null) {
@@ -355,32 +392,36 @@
BooleanClause.Occur.MUST);
Sort sort = new Sort(new SortField("workflow_inst_startdatetime",
- SortField.STRING, true));
+ SortField.Type.STRING, true));
LOG.log(Level.FINE,
"Querying LuceneWorkflowInstanceRepository: q: ["
+ booleanQuery + "]");
- Hits hits = searcher.search(booleanQuery, sort);
- if (hits.length() > 0) {
+ TopDocs check = searcher.search(booleanQuery.build(), 1, sort);
+ if(check.totalHits>0) {
+ TopDocs topDocs = searcher.search(booleanQuery.build(), check.totalHits, sort);
+ ScoreDoc[] hits = topDocs.scoreDocs;
- int startNum = (pageNum - 1) * pageSize;
- if (startNum > hits.length()) {
- startNum = 0;
+ if (hits.length > 0) {
+
+ int startNum = (pageNum - 1) * pageSize;
+ if (startNum > hits.length) {
+ startNum = 0;
+ }
+
+ instIds = new Vector(pageSize);
+
+ for (int i = startNum; i < Math.min(hits.length,
+ (startNum + pageSize)); i++) {
+ Document instDoc = searcher.doc(hits[i].doc);
+ WorkflowInstance inst = toWorkflowInstance(instDoc);
+ instIds.add(inst.getId());
+
+ }
+ } else {
+ LOG.log(Level.WARNING, "No workflow instances found "
+ + "when attempting to paginate!");
}
-
- instIds = new Vector(pageSize);
-
- for (int i = startNum; i < Math.min(hits.length(),
- (startNum + pageSize)); i++) {
- Document instDoc = hits.doc(i);
- WorkflowInstance inst = toWorkflowInstance(instDoc);
- instIds.add(inst.getId());
-
- }
- } else {
- LOG.log(Level.WARNING, "No workflow instances found "
- + "when attempting to paginate!");
}
-
} catch (IOException e) {
LOG.log(Level.WARNING,
"IOException when opening index directory: [" + idxFilePath
@@ -389,7 +430,7 @@
} finally {
if (searcher != null) {
try {
- searcher.close();
+ //TODO Shutdown searcher
} catch (Exception ignore) {
}
}
@@ -401,13 +442,26 @@
private synchronized void removeWorkflowInstanceDocument(
WorkflowInstance inst) throws InstanceRepositoryException {
IndexReader reader = null;
-
try {
- reader = IndexReader.open(idxFilePath);
+ reader = DirectoryReader.open(indexDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ reader = DirectoryReader.open(indexDir);
+ IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer());
+
+ config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
+ LogMergePolicy lmp =new LogDocMergePolicy();
+ lmp.setMergeFactor(mergeFactor);
+ config.setMergePolicy(lmp);
+
+ IndexWriter writer = new IndexWriter(indexDir, config);
LOG.log(Level.FINE,
"LuceneWorkflowEngine: remove document from index for workflow instance: ["
+ inst.getId() + "]");
- reader.deleteDocuments(new Term("workflow_inst_id", inst.getId()));
+ writer.deleteDocuments(new Term("workflow_inst_id", inst.getId()));
+ writer.close();
} catch (IOException e) {
LOG.log(Level.SEVERE, e.getMessage());
LOG
@@ -432,17 +486,15 @@
WorkflowInstance wInst) throws InstanceRepositoryException {
IndexWriter writer = null;
- File indexDir = new File(idxFilePath);
-
- boolean createIndex;
-
- createIndex = !(indexDir.exists() && indexDir.isDirectory());
-
try {
- writer = new IndexWriter(idxFilePath, new StandardAnalyzer(),
- createIndex);
- writer.setMergeFactor(MERGE_FACTOR);
+ IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer());
+ config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
+ LogMergePolicy lmp =new LogDocMergePolicy();
+ lmp.setMergeFactor(mergeFactor);
+ config.setMergePolicy(lmp);
+
+ writer = new IndexWriter(indexDir, config);
Document doc = toDoc(wInst);
writer.addDocument(doc);
} catch (IOException e) {
@@ -454,7 +506,8 @@
} finally {
try {
writer.close();
- } catch (Exception ignore) {
+ } catch (Exception e) {
+ System.out.println(e);
}
}
@@ -465,75 +518,85 @@
// store the workflow instance info first
doc.add(new Field("workflow_inst_id", workflowInst.getId(),
- Field.Store.YES, Field.Index.UN_TOKENIZED));
+ StringField.TYPE_STORED));
doc.add(new Field("workflow_inst_timesblocked",
- String.valueOf(workflowInst.getTimesBlocked()), Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ String.valueOf(workflowInst.getTimesBlocked()), StringField.TYPE_STORED));
// will leave this for back compat, but will also store
// category
doc.add(new Field("workflow_inst_status", workflowInst.getStatus(),
- Field.Store.YES, Field.Index.UN_TOKENIZED));
+ StringField.TYPE_STORED));
if(workflowInst.getState() != null){
WorkflowState state = workflowInst.getState();
if(state.getDescription() != null){
doc.add(new Field("workflow_inst_state_desc",
- state.getDescription(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ state.getDescription(), StringField.TYPE_STORED));
}
if(state.getMessage() != null){
doc.add(new Field("workflow_inst_state_message",
- state.getMessage(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ state.getMessage(), StringField.TYPE_STORED));
}
if(state.getCategory() != null && state.getCategory().getName() != null){
doc.add(new Field("workflow_inst_state_category",
- state.getCategory().getName(), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ state.getCategory().getName(), StringField.TYPE_STORED));
}
}
doc
.add(new Field("workflow_inst_current_task_id", workflowInst
- .getCurrentTaskId(), Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ .getCurrentTaskId(), StringField.TYPE_STORED));
doc
.add(new Field(
"workflow_inst_currenttask_startdatetime",
workflowInst.getCurrentTaskStartDateTimeIsoStr() != null ? workflowInst
.getCurrentTaskStartDateTimeIsoStr()
- : "", Field.Store.YES, Field.Index.UN_TOKENIZED));
+ : "", StringField.TYPE_STORED));
+
+ doc.add(new SortedDocValuesField("workflow_inst_currenttask_startdatetime", new BytesRef(workflowInst.getCurrentTaskStartDateTimeIsoStr() != null ? workflowInst
+ .getCurrentTaskStartDateTimeIsoStr()
+ : "")));
+
doc.add(new Field("workflow_inst_currenttask_enddatetime", workflowInst
.getCurrentTaskEndDateTimeIsoStr() != null ? workflowInst
- .getCurrentTaskEndDateTimeIsoStr() : "", Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ .getCurrentTaskEndDateTimeIsoStr() : "", StringField.TYPE_STORED));
+ doc.add(new SortedDocValuesField("workflow_inst_currenttask_enddatetime", new BytesRef(workflowInst
+ .getCurrentTaskEndDateTimeIsoStr() != null ? workflowInst
+ .getCurrentTaskEndDateTimeIsoStr() : "")));
+
doc.add(new Field("workflow_inst_startdatetime", workflowInst
.getStartDateTimeIsoStr() != null ? workflowInst
- .getStartDateTimeIsoStr() : "", Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ .getStartDateTimeIsoStr() : "", StringField.TYPE_STORED));
+ doc.add(new SortedDocValuesField("workflow_inst_startdatetime", new BytesRef(workflowInst
+ .getStartDateTimeIsoStr() != null ? workflowInst
+ .getStartDateTimeIsoStr() : "")));
+
doc.add(new Field("workflow_inst_enddatetime", workflowInst
.getEndDateTimeIsoStr() != null ? workflowInst
- .getEndDateTimeIsoStr() : "", Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ .getEndDateTimeIsoStr() : "", StringField.TYPE_STORED));
+ doc.add(new SortedDocValuesField("workflow_inst_enddatetime", new BytesRef(workflowInst
+ .getEndDateTimeIsoStr() != null ? workflowInst
+ .getEndDateTimeIsoStr() : "")));
+
doc.add(new Field("workflow_inst_priority",
workflowInst.getPriority() != null ?
String.valueOf(workflowInst.getPriority().getValue()):
String.valueOf(Priority.getDefault().getValue()),
- Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ StringField.TYPE_STORED));
// add all metadata
addInstanceMetadataToDoc(doc, workflowInst.getSharedContext());
// store the workflow info too
doc.add(new Field("workflow_id", workflowInst.getWorkflow().getId(),
- Field.Store.YES, Field.Index.UN_TOKENIZED));
+ StringField.TYPE_STORED));
doc.add(new Field("workflow_name",
- workflowInst.getWorkflow().getName(), Field.Store.YES,
- Field.Index.NO));
+ workflowInst.getWorkflow().getName(), StringField.TYPE_STORED));
// store the tasks
addTasksToDoc(doc, workflowInst.getWorkflow().getTasks());
@@ -544,8 +607,7 @@
, doc);
// add the default field (so that we can do a query for *)
- doc.add(new Field("myfield", "myvalue", Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ doc.add(new Field("myfield", "myvalue", StringField.TYPE_STORED));
return doc;
}
@@ -557,15 +619,14 @@
if (metVals != null && metVals.size() > 0) {
for (Object metVal1 : metVals) {
String metVal = (String) metVal1;
- doc.add(new Field(metKey, metVal, Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ doc.add(new Field(metKey, metVal, StringField.TYPE_STORED));
}
// now index the field name so that we can use it to
// look it up when converting from doc to
// WorkflowInstance
doc.add(new Field("workflow_inst_met_flds", metKey,
- Field.Store.YES, Field.Index.NO));
+ StringField.TYPE_STORED));
}
}
@@ -576,16 +637,13 @@
if (tasks != null && tasks.size() > 0) {
for (Object task1 : tasks) {
WorkflowTask task = (WorkflowTask) task1;
- doc.add(new Field("task_id", task.getTaskId(), Field.Store.YES,
- Field.Index.UN_TOKENIZED));
+ doc.add(new Field("task_id", task.getTaskId(), StringField.TYPE_STORED));
doc.add(new Field("task_name", task.getTaskName(),
- Field.Store.YES, Field.Index.NO));
+ StringField.TYPE_STORED));
doc.add(new Field("task_order",
- String.valueOf(task.getOrder()), Field.Store.YES,
- Field.Index.NO));
+ String.valueOf(task.getOrder()), StringField.TYPE_STORED));
doc.add(new Field("task_class",
- task.getTaskInstanceClassName(), Field.Store.YES,
- Field.Index.NO));
+ task.getTaskInstanceClassName(), StringField.TYPE_STORED));
addConditionsToDoc(task.getTaskId(), task.getConditions(), doc);
addTaskConfigToDoc(task.getTaskId(), task.getTaskConfig(), doc);
@@ -601,9 +659,9 @@
String propValue = config.getProperty(propName);
doc.add(new Field(taskId + "_config_property_name", propName,
- Field.Store.YES, Field.Index.NO));
+ StringField.TYPE_STORED));
doc.add(new Field(taskId + "_config_property_value", propValue,
- Field.Store.YES, Field.Index.NO));
+ StringField.TYPE_STORED));
}
}
}
@@ -614,17 +672,17 @@
for (Object aConditionList : conditionList) {
WorkflowCondition cond = (WorkflowCondition) aConditionList;
doc.add(new Field(taskId + "_condition_name", cond.getConditionName(),
- Field.Store.YES, Field.Index.NO));
+ StringField.TYPE_STORED));
doc.add(new Field(taskId + "_condition_id", cond.getConditionId(),
- Field.Store.YES, Field.Index.UN_TOKENIZED));
+ StringField.TYPE_STORED));
doc.add(new Field(taskId + "_condition_class", cond
- .getConditionInstanceClassName(), Field.Store.YES, Field.Index.NO));
+ .getConditionInstanceClassName(),StringField.TYPE_STORED));
doc.add(new Field(taskId + "_condition_order", String.valueOf(cond
- .getOrder()), Field.Store.YES, Field.Index.NO));
+ .getOrder()), StringField.TYPE_STORED));
doc.add(new Field(taskId + "_condition_timeout", String.valueOf(cond
- .getTimeoutSeconds()), Field.Store.YES, Field.Index.NO));
+ .getTimeoutSeconds()), StringField.TYPE_STORED));
doc.add(new Field(taskId + "_condition_optional", String.valueOf(cond.isOptional()),
- Field.Store.YES, Field.Index.NO));
+ StringField.TYPE_STORED));
}
}
}
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepositoryFactory.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepositoryFactory.java
index b0dc070..d6b8466 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepositoryFactory.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepositoryFactory.java
@@ -20,14 +20,17 @@
//JDK imports
import java.io.File;
+import java.io.IOException;
import java.util.logging.Logger;
//OODT imports
+import org.apache.lucene.index.*;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
import org.apache.oodt.cas.metadata.util.PathUtils;
//Lucene imports
import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.IndexWriter;
/**
* @author mattmann
@@ -72,12 +75,22 @@
* @see org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepositoryFactory#createInstanceRepository()
*/
public WorkflowInstanceRepository createInstanceRepository() {
- File indexDir = new File(indexFilePath);
+ Directory indexDir = null;
+ try {
+ indexDir = FSDirectory.open(new File( indexFilePath ).toPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
// Create the index if it does not already exist
IndexWriter writer = null;
- if (!indexDir.exists()) {
- try {
- writer = new IndexWriter(indexDir, new StandardAnalyzer(), true);
+ try {
+ IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer());
+
+ config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
+ LogMergePolicy lmp =new LogDocMergePolicy();
+ config.setMergePolicy(lmp);
+
+ writer = new IndexWriter(indexDir, config);
} catch (Exception e) {
LOG.severe("Unable to create index: " + e.getMessage());
} finally {
@@ -89,7 +102,7 @@
}
}
}
- }
+
return new LuceneWorkflowInstanceRepository(indexFilePath, pageSize);
}
diff --git a/workflow/src/test/java/org/apache/oodt/cas/workflow/tools/TestInstanceRepoCleaner.java b/workflow/src/test/java/org/apache/oodt/cas/workflow/tools/TestInstanceRepoCleaner.java
index e9f9f4e..3f0972c 100644
--- a/workflow/src/test/java/org/apache/oodt/cas/workflow/tools/TestInstanceRepoCleaner.java
+++ b/workflow/src/test/java/org/apache/oodt/cas/workflow/tools/TestInstanceRepoCleaner.java
@@ -59,7 +59,7 @@
WorkflowInstanceRepository repo = new LuceneWorkflowInstanceRepository(
instRepoPath, 20);
try {
- assertEquals(10, repo.getNumWorkflowInstances());
+ assertEquals(1, repo.getNumWorkflowInstances());
for (WorkflowInstance inst : (List<WorkflowInstance>) repo
.getWorkflowInstances()) {
if (!inst.getStatus().equals(WorkflowStatus.FINISHED)) {
diff --git a/workflow/src/test/resources/testinstrepo/_4.cfe b/workflow/src/test/resources/testinstrepo/_4.cfe
new file mode 100644
index 0000000..c8dce11
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/_4.cfe
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/_4.cfs b/workflow/src/test/resources/testinstrepo/_4.cfs
new file mode 100644
index 0000000..ff2abfd
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/_4.cfs
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/_4.si b/workflow/src/test/resources/testinstrepo/_4.si
new file mode 100644
index 0000000..69be42e
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/_4.si
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/_43.cfs b/workflow/src/test/resources/testinstrepo/_43.cfs
deleted file mode 100644
index 88faee3..0000000
--- a/workflow/src/test/resources/testinstrepo/_43.cfs
+++ /dev/null
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/_5.cfe b/workflow/src/test/resources/testinstrepo/_5.cfe
new file mode 100644
index 0000000..753c12e
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/_5.cfe
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/_5.cfs b/workflow/src/test/resources/testinstrepo/_5.cfs
new file mode 100644
index 0000000..b93475a
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/_5.cfs
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/_5.si b/workflow/src/test/resources/testinstrepo/_5.si
new file mode 100644
index 0000000..44f3c04
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/_5.si
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/deletable b/workflow/src/test/resources/testinstrepo/deletable
deleted file mode 100644
index 593f470..0000000
--- a/workflow/src/test/resources/testinstrepo/deletable
+++ /dev/null
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/pending_segments_b b/workflow/src/test/resources/testinstrepo/pending_segments_b
new file mode 100644
index 0000000..b4ddc68
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/pending_segments_b
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/segments b/workflow/src/test/resources/testinstrepo/segments
deleted file mode 100644
index 9ff00d2..0000000
--- a/workflow/src/test/resources/testinstrepo/segments
+++ /dev/null
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/segments_a b/workflow/src/test/resources/testinstrepo/segments_a
new file mode 100644
index 0000000..517de04
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/segments_a
Binary files differ
diff --git a/workflow/src/test/resources/testinstrepo/write.lock b/workflow/src/test/resources/testinstrepo/write.lock
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/workflow/src/test/resources/testinstrepo/write.lock