add multiprocess support with hdfs index

git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-1219@1693798 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/build.xml b/build.xml
index 32937bc..195a531 100644
--- a/build.xml
+++ b/build.xml
@@ -1999,6 +1999,25 @@
             <param name="artifact-name" value="lucene-sandbox"/>
             <param name="artifact-type" value="jar"/>
         </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/apache/solr"/>
+            <param name="artifact-version" value="${lucene.version}"/>
+            <param name="artifact-name" value="solr-core"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="com/googlecode/concurrentlinkedhashmap"/>
+            <param name="artifact-version" value="1.2"/>
+            <param name="artifact-name" value="concurrentlinkedhashmap-lru"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <mkdir dir="lib/hadoop-minicluster"/>
+        <copy file="connectors/lucene/hadoop-minicluster.xml" toFile="lib/hadoop-minicluster/pom.xml"/>
+        <exec dir="lib/hadoop-minicluster" executable="mvn">
+            <arg line="dependency:copy-dependencies"/>
+        </exec>
     </target>
 	
     <target name="make-core-deps" depends="download-resteasy,download-jsoup,download-mockito,download-alfresco-webscript-plugin,download-alfresco-indexer-client,download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-woodstox,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-postgresql,download-axis,download-saaj,download-wsdl4j,download-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-htrace,download-protobuf,download-tika,download-jackson,download-lucene">
diff --git a/connectors/lucene/build.xml b/connectors/lucene/build.xml
index d93fabf..efb640f 100644
--- a/connectors/lucene/build.xml
+++ b/connectors/lucene/build.xml
@@ -40,6 +40,22 @@
               <include name="lucene-sandbox*.jar"/>
               <include name="guava-*.jar"/>
               <include name="gson*.jar"/>
+              <include name="solr-core*.jar"/>
+              <include name="concurrentlinkedhashmap-lru*.jar"/>
+              <include name="solr-solrj*.jar"/>
+              <include name="hadoop-common*.jar"/>
+              <include name="hadoop-annotations*.jar"/>
+              <include name="hadoop-auth*.jar"/>
+              <include name="hadoop-hdfs*.jar"/>
+              <include name="htrace-core*.jar"/>
+              <include name="protobuf-java*.jar"/>
+        </fileset>
+    </path>
+
+    <path id="connector-test-classpath">
+        <path refid="mcf-connector-build.connector-test-classpath"/>
+        <fileset dir="../../lib/hadoop-minicluster/target/dependency">
+            <include name="*.jar"/>
         </fileset>
     </path>
 
@@ -54,6 +70,15 @@
                 <include name="lucene-sandbox*.jar"/>
                 <include name="guava-*.jar"/>
                 <include name="gson*.jar"/>
+                <include name="solr-core*.jar"/>
+                <include name="concurrentlinkedhashmap-lru*.jar"/>
+                <include name="solr-solrj*.jar"/>
+                <include name="hadoop-common*.jar"/>
+                <include name="hadoop-annotations*.jar"/>
+                <include name="hadoop-auth*.jar"/>
+                <include name="hadoop-hdfs*.jar"/>
+                <include name="htrace-core*.jar"/>
+                <include name="protobuf-java*.jar"/>
             </fileset>
         </copy>
     </target>
diff --git a/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClient.java b/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClient.java
index 8e3b6fe..001d938 100644
--- a/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClient.java
+++ b/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClient.java
@@ -20,7 +20,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -28,6 +27,8 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.analysis.custom.CustomAnalyzer;
@@ -47,6 +48,14 @@
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.NRTCachingDirectory;
+import org.apache.solr.store.blockcache.BlockCache;
+import org.apache.solr.store.blockcache.BlockDirectory;
+import org.apache.solr.store.blockcache.BlockDirectoryCache;
+import org.apache.solr.store.blockcache.BufferStore;
+import org.apache.solr.store.blockcache.Cache;
+import org.apache.solr.store.blockcache.Metrics;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+import org.apache.solr.store.hdfs.HdfsLockFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
@@ -60,7 +69,7 @@
 
 public class LuceneClient implements Closeable {
 
-  private final Path path;
+  private final String path;
   private final Map<String,Map<String,Object>> charfiltersInfo;
   private final Map<String,Map<String,Object>> tokenizersInfo;
   private final Map<String,Map<String,Object>> filtersInfo;
@@ -115,7 +124,7 @@
     }
   }
 
-  public LuceneClient(Path path) throws IOException {
+  public LuceneClient(String path) throws IOException {
     this(path,
          LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(),
          LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
@@ -123,7 +132,7 @@
          LuceneClient.defaultMaxDocumentLength());
   }
 
-  public LuceneClient(Path path,
+  public LuceneClient(String path,
                       String charfilters, String tokenizers, String filters,
                       String analyzers, String fields,
                       String idField, String contentField,
@@ -153,10 +162,9 @@
       .setCommitOnClose(IndexWriterConfig.DEFAULT_COMMIT_ON_CLOSE)
       .setRAMBufferSizeMB(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB * 6);
 
-    Directory fsDir = FSDirectory.open(path);
-    NRTCachingDirectory cachedDir = new NRTCachingDirectory(fsDir, 4, 48);
+    Directory dir = initDirectory();
 
-    this.writer = new IndexWriter(cachedDir, config);
+    this.writer = new IndexWriter(dir, config);
 
     initIndex();
 
@@ -251,10 +259,68 @@
     return copy;
   }
 
+  public static boolean useHdfs(String path) {
+    return path.startsWith("hdfs:/");
+  }
+
+  private static BlockCache globalBlockCache;
+
+  private Directory initDirectory() throws IOException {
+    Directory directory;
+
+    if (!useHdfs(path)) {
+      Directory fsDir = FSDirectory.open(new File(path).toPath());
+      directory = new NRTCachingDirectory(fsDir, 4, 48);
+    } else {
+      Directory dir;
+
+      Configuration conf = new Configuration();
+      conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+      Metrics metrics = new Metrics();
+
+      boolean blockCacheEnabled = true;
+      if (blockCacheEnabled) {
+        boolean blockCacheGlobal = true;
+        boolean blockCacheReadEnabled = true;
+
+        int numberOfBlocksPerBank = 16384;
+        int blockSize = BlockDirectory.BLOCK_SIZE;
+        int bankCount = 1;
+        boolean directAllocation = true;
+        int slabSize = numberOfBlocksPerBank * blockSize;
+        int bufferSize = 128;
+        int bufferCount = 128 * 128;
+
+        synchronized (LuceneClient.class) {
+          if (globalBlockCache == null) {
+            BufferStore.initNewBuffer(bufferSize, bufferCount, metrics);
+
+            long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank * (long) blockSize;
+            globalBlockCache = new BlockCache(metrics, directAllocation, totalMemory, slabSize, blockSize);
+          }
+        }
+
+        Cache cache = new BlockDirectoryCache(globalBlockCache, path, metrics, blockCacheGlobal);
+        HdfsDirectory hdfsDir = new HdfsDirectory(new Path(path), HdfsLockFactory.INSTANCE, conf);
+        dir = new BlockDirectory(path, hdfsDir, cache, null, blockCacheReadEnabled, false);
+      } else {
+        dir = new HdfsDirectory(new Path(path), HdfsLockFactory.INSTANCE, conf);
+      }
+      directory = new NRTCachingDirectory(dir, 16, 192);
+    }
+    return directory;
+  }
+
   private void initIndex() throws IOException {
-    File dirFile = path.toFile();
-    boolean indexExists = dirFile.canRead() && dirFile.list().length > 1;
-    if (!indexExists) writer.commit();
+    if (!useHdfs(path)) {
+      File dirFile = new File(path);
+      boolean indexExists = dirFile.canRead() && dirFile.list().length > 1;
+      if (!indexExists) writer.commit();
+    } else {
+      writer.commit();
+      refresh();
+    }
   }
 
   public Map<String,Map<String,Object>> fieldsInfo() {
@@ -278,7 +344,7 @@
   }
 
   public static String createVersionString(
-    Path path,
+    String path,
     Map<String,Map<String,Object>> charfiltersInfo,
     Map<String,Map<String,Object>> tokenizersInfo,
     Map<String,Map<String,Object>> filtersInfo,
@@ -286,7 +352,7 @@
     Map<String,Map<String,Object>> fieldsInfo,
     String idField,String contentField,
     Long maxDocumentLength) {
-    return LuceneConfig.PARAM_PATH + ":" + path.toString() + "+"
+    return LuceneConfig.PARAM_PATH + ":" + path + "+"
          + LuceneConfig.PARAM_CHARFILTERS + ":" + Joiner.on(",").withKeyValueSeparator("=").join(charfiltersInfo) + "+"
          + LuceneConfig.PARAM_TOKENIZERS + ":" + Joiner.on(",").withKeyValueSeparator("=").join(tokenizersInfo) + "+"
          + LuceneConfig.PARAM_FILTERS + ":" + Joiner.on(",").withKeyValueSeparator("=").join(filtersInfo) + "+"
@@ -371,10 +437,12 @@
   }
 
   public LeafReader reader() throws IOException {
+    // The caller is responsible for ensuring that the returned reader is closed.
     return SlowCompositeReaderWrapper.wrap(DirectoryReader.open(writer.getDirectory()));
   }
 
   public IndexSearcher newSearcher() throws IOException {
+    // The caller is responsible for ensuring that returned searcher's reader is closed.
     return new IndexSearcher(DirectoryReader.open(writer.getDirectory()));
   }
 
diff --git a/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClientManager.java b/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClientManager.java
index f2e4995..05a9342 100644
--- a/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClientManager.java
+++ b/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneClientManager.java
@@ -1,6 +1,5 @@
 package org.apache.manifoldcf.agents.output.lucene;
 
-import java.io.File;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
@@ -12,24 +11,31 @@
   private LuceneClientManager() { }
 
   public synchronized static LuceneClient getClient(
-                      String path,
+                      String path, String processID,
                       String charfilters, String tokenizers, String filters,
                       String analyzers, String fields,
                       String idField, String contentField,
                       Long maxDocumentLength) throws Exception
   {
-    LuceneClient client = clients.get(path);
+    String paramPath;
+    if (!LuceneClient.useHdfs(path)) {
+      paramPath = path;
+    } else {
+      paramPath =  (processID.equals("")) ? path : path + "/" + processID;
+    }
+
+    LuceneClient client = clients.get(paramPath);
 
     if (client == null) {
-      return newClient(path, charfilters, tokenizers, filters, analyzers, fields, idField, contentField, maxDocumentLength);
+      return newClient(paramPath, charfilters, tokenizers, filters, analyzers, fields, idField, contentField, maxDocumentLength);
     }
 
     if (client != null) {
       if (!client.isOpen()) {
-        return newClient(path, charfilters, tokenizers, filters, analyzers, fields, idField, contentField, maxDocumentLength);
+        return newClient(paramPath, charfilters, tokenizers, filters, analyzers, fields, idField, contentField, maxDocumentLength);
       }
       String latestVersion = LuceneClient.createVersionString(
-          new File(path).toPath(),
+          paramPath,
           LuceneClient.parseAsMap(charfilters), 
           LuceneClient.parseAsMap(tokenizers),
           LuceneClient.parseAsMap(filters),
@@ -53,9 +59,16 @@
           String idField, String contentField,
           Long maxDocumentLength) throws Exception
   {
-    LuceneClient client =  new LuceneClient(new File(path).toPath(),
-                           charfilters, tokenizers, filters, analyzers, fields,
-                           idField, contentField, maxDocumentLength);
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    LuceneClient client;
+    try {
+      Thread.currentThread().setContextClassLoader(LuceneClientManager.class.getClassLoader());
+      client = new LuceneClient(path,
+        charfilters, tokenizers, filters, analyzers, fields,
+        idField, contentField, maxDocumentLength);
+    } finally {
+      Thread.currentThread().setContextClassLoader(cl);
+    }
     clients.put(path, client);
     return client;
   }
diff --git a/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneConnector.java b/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneConnector.java
index 25fb28e..c92c22a 100644
--- a/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneConnector.java
+++ b/connectors/lucene/connector/src/main/java/org/apache/manifoldcf/agents/output/lucene/LuceneConnector.java
@@ -36,6 +36,7 @@
 import org.apache.manifoldcf.core.interfaces.IThreadContext;
 import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
 import org.apache.manifoldcf.core.interfaces.VersionContext;
+import org.apache.manifoldcf.core.system.ManifoldCF;
 import org.apache.manifoldcf.crawler.system.Logging;
 
 
@@ -166,7 +167,7 @@
 
       try
       {
-        client = LuceneClientManager.getClient(path,
+        client = LuceneClientManager.getClient(path, ManifoldCF.getProcessID(),
                    charfilters, tokenizers, filters, analyzers, fields,
                    idField, contentField, maxDocumentLength);
       }
diff --git a/connectors/lucene/connector/src/test/java/org/apache/manifoldcf/agents/output/lucene/tests/LuceneClientTest.java b/connectors/lucene/connector/src/test/java/org/apache/manifoldcf/agents/output/lucene/tests/LuceneClientTest.java
index 953f671..5067150 100644
--- a/connectors/lucene/connector/src/test/java/org/apache/manifoldcf/agents/output/lucene/tests/LuceneClientTest.java
+++ b/connectors/lucene/connector/src/test/java/org/apache/manifoldcf/agents/output/lucene/tests/LuceneClientTest.java
@@ -24,7 +24,10 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
@@ -32,8 +35,8 @@
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.util.BytesRef;
-
 import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
 import org.apache.manifoldcf.agents.output.lucene.LuceneClient;
 import org.apache.manifoldcf.agents.output.lucene.LuceneClientManager;
@@ -45,8 +48,11 @@
 import com.google.common.io.ByteSource;
 
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 import static org.hamcrest.CoreMatchers.*;
 
@@ -55,9 +61,29 @@
   private static final String sep = StandardSystemProperty.FILE_SEPARATOR.value();
   private File testDir;
 
+  static {
+    System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog");
+  }
+  private static MiniDFSCluster hdfsCluster;
+  private static String hdfsPath;
+
   private static final String ID = LuceneClient.defaultIdField();
   private static final String CONTENT = LuceneClient.defaultContentField();
 
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    hdfsCluster = builder.build();
+    hdfsPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/HdfsTest";
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    hdfsCluster.shutdown();
+  }
+
   @Before
   public void setUp() {
     String root = getClass().getResource("/").getFile();
@@ -90,7 +116,7 @@
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"openclose-index";
     File f = new File(path);
     assertThat(f.exists(), is(false));
-    LuceneClient client = new LuceneClient(f.toPath());
+    LuceneClient client = new LuceneClient(path);
     assertThat(f.exists(), is(true));
     assertThat(client.isOpen(), is(true));
     client.close();
@@ -100,7 +126,7 @@
   @Test
   public void testInitIndexDir() throws IOException {
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"initindexdir-index";
-    LuceneClient client = new LuceneClient(new File(path).toPath());
+    LuceneClient client = new LuceneClient(path);
     List<String> indexDirList = Arrays.asList(new File(path).list());
     assertThat(indexDirList.size(), is(2));
     assertThat(indexDirList.contains("write.lock"), is(true));
@@ -108,6 +134,7 @@
 
     IndexSearcher searcher = client.newSearcher();
     assertThat(searcher.count(new MatchAllDocsQuery()), is(0));
+    searcher.getIndexReader().close();
 
     assertThat(client.newRealtimeSearcher().count(new MatchAllDocsQuery()), is(0));
     client.close();
@@ -118,12 +145,12 @@
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"getclientfrommanager-index";
 
     LuceneClient client1 =
-      LuceneClientManager.getClient(path, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+      LuceneClientManager.getClient(path, ManifoldCF.getProcessID(), LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
         LuceneClient.defaultIdField(), LuceneClient.defaultContentField(), LuceneClient.defaultMaxDocumentLength());
     assertThat(client1.isOpen(), is(true));
 
     LuceneClient client2 =
-      LuceneClientManager.getClient(path, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+      LuceneClientManager.getClient(path, ManifoldCF.getProcessID(), LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
         "id", "content", LuceneClient.defaultMaxDocumentLength());
     assertThat(client2.isOpen(), is(true));
 
@@ -132,7 +159,7 @@
     LuceneClient client3;
     try {
       client3 =
-        LuceneClientManager.getClient(path, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+        LuceneClientManager.getClient(path, ManifoldCF.getProcessID(), LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
           "dummy_id", "dummy_content", LuceneClient.defaultMaxDocumentLength());
       fail("Should not get here");
     } catch (Exception e) {
@@ -145,7 +172,56 @@
     assertThat(client1, is(client2));
 
     client3 =
-      LuceneClientManager.getClient(path, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+      LuceneClientManager.getClient(path, ManifoldCF.getProcessID(), LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+        "dummy_id", "dummy_content", LuceneClient.defaultMaxDocumentLength());
+    assertThat(client3.isOpen(), is(true));
+
+    assertThat(client3, not(client1));
+    assertThat(client3, not(client2));
+  }
+
+  @Test
+  public void testGetClientFromManagerHdfs() throws Exception {
+    String path = hdfsPath+"/getclientfrommanager";
+    String processID_A = "A";
+    String processID_B = "B";
+
+    LuceneClient client1 =
+      LuceneClientManager.getClient(path, processID_A, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+        LuceneClient.defaultIdField(), LuceneClient.defaultContentField(), LuceneClient.defaultMaxDocumentLength());
+    assertThat(client1.isOpen(), is(true));
+
+    LuceneClient client2 =
+      LuceneClientManager.getClient(path, processID_A, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+        "id", "content", LuceneClient.defaultMaxDocumentLength());
+    assertThat(client2.isOpen(), is(true));
+
+    assertThat(client1, is(client2));
+
+    LuceneClient clientB =
+      LuceneClientManager.getClient(path, processID_B, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+      LuceneClient.defaultIdField(), LuceneClient.defaultContentField(), LuceneClient.defaultMaxDocumentLength());
+    assertThat(clientB.isOpen(), is(true));
+
+    assertThat(clientB, not(client2));
+
+    LuceneClient client3;
+    try {
+      client3 =
+        LuceneClientManager.getClient(path, processID_A, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
+          "dummy_id", "dummy_content", LuceneClient.defaultMaxDocumentLength());
+      fail("Should not get here");
+    } catch (Exception e) {
+      assert e instanceof IllegalStateException;
+    }
+
+    client1.close();
+    assertThat(client1.isOpen(), is(false));
+    assertThat(client2.isOpen(), is(false));
+    assertThat(client1, is(client2));
+
+    client3 =
+      LuceneClientManager.getClient(path, processID_A, LuceneClient.defaultCharfilters(), LuceneClient.defaultTokenizers(), LuceneClient.defaultFilters(), LuceneClient.defaultAnalyzers(), LuceneClient.defaultFields(),
         "dummy_id", "dummy_content", LuceneClient.defaultMaxDocumentLength());
     assertThat(client3.isOpen(), is(true));
 
@@ -156,7 +232,7 @@
   @Test
   public void testAddOrReplace() throws IOException {
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"addorreplace-index";
-    try (LuceneClient client = new LuceneClient(new File(path).toPath())) {
+    try (LuceneClient client = new LuceneClient(path)) {
       // add
       LuceneDocument doc1 = new LuceneDocument();
       doc1 = LuceneDocument.addField(doc1, ID, "/repo/001", client.fieldsInfo());
@@ -172,6 +248,7 @@
       IndexSearcher searcher = client.newSearcher();
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "green"))), is(1));
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "yellow"))), is(1));
+      searcher.getIndexReader().close();
 
       // update
       LuceneDocument updateDoc = new LuceneDocument();
@@ -183,6 +260,7 @@
       searcher = client.newSearcher();
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "green"))), is(0));
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "yellow"))), is(2));
+      searcher.getIndexReader().close();
 
       // add
       LuceneDocument addDoc = new LuceneDocument();
@@ -195,13 +273,14 @@
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "green"))), is(0));
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "yellow"))), is(2));
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "red"))), is(1));
+      searcher.getIndexReader().close();
     }
   }
 
   @Test
   public void testRemove() throws IOException {
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"remove-index";
-    try (LuceneClient client = new LuceneClient(new File(path).toPath())) {
+    try (LuceneClient client = new LuceneClient(path)) {
 
       LuceneDocument doc1 = new LuceneDocument();
       doc1 = LuceneDocument.addField(doc1, ID, "/repo/001", client.fieldsInfo());
@@ -216,19 +295,21 @@
       client.optimize();
       IndexSearcher searcher = client.newSearcher();
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "apache"))), is(2));
+      searcher.getIndexReader().close();
 
       client.remove("/repo/001");
 
       client.optimize();
       searcher = client.newSearcher();
       assertThat(searcher.count(new TermQuery(new Term(CONTENT, "apache"))), is(1));
+      searcher.getIndexReader().close();
     }
   }
 
   @Test
   public void testDefaultSettings() throws IOException, InterruptedException {
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"defaultsettings-index";
-    try (LuceneClient client = new LuceneClient(new File(path).toPath())) {
+    try (LuceneClient client = new LuceneClient(path)) {
 
       String content1 = "Apache ManifoldCF, Apache Lucene";
       LuceneDocument doc1 = new LuceneDocument();
@@ -264,20 +345,22 @@
 
       TopDocs hits = searcher.search(client.newQuery("id:\\/repo\\/001"), 1);
       int docID = hits.scoreDocs[0].doc;
-      Terms terms = client.reader().getTermVector(docID, CONTENT);
-      TermsEnum te = terms.iterator();
-      BytesRef br;
-      while ((br = te.next()) != null) {
-        if (te.seekExact(new BytesRef("apache"))) {
-          assertThat(br.utf8ToString(), is("apache"));
-          assertThat(te.totalTermFreq(), is(2L));
-          break;
+      try (LeafReader reader = client.reader()) {
+        Terms terms = reader.getTermVector(docID, CONTENT);
+        TermsEnum te = terms.iterator();
+        BytesRef br;
+        while ((br = te.next()) != null) {
+          if (te.seekExact(new BytesRef("apache"))) {
+            assertThat(br.utf8ToString(), is("apache"));
+            assertThat(te.totalTermFreq(), is(2L));
+            break;
+          }
         }
-      }
-      assertThat(client.reader().docFreq(new Term(CONTENT, br)), is(3));
+        assertThat(reader.docFreq(new Term(CONTENT, br)), is(3));
 
-      assertThat(client.reader().getTermVector(docID, "content_ws"), is(nullValue()));
-      assertThat(client.reader().getTermVector(docID, "content_ngram"), is(nullValue()));
+        assertThat(reader.getTermVector(docID, "content_ws"), is(nullValue()));
+        assertThat(reader.getTermVector(docID, "content_ngram"), is(nullValue()));
+      }
 
       hits = searcher.search(client.newQuery("id:\\/repo\\/003"), 1);
       Document storedDocument = searcher.doc(hits.scoreDocs[0].doc);
@@ -294,6 +377,9 @@
       IndexSearcher searcher2 = client.newSearcher();
       assertThat(searcher2.count(client.newQuery(ID+":"+nrt)), is(0));
       assertThat(client.newRealtimeSearcher().count(client.newQuery(ID+":"+nrt)), is(1));
+ 
+      searcher.getIndexReader().close();
+      searcher2.getIndexReader().close();
     }
   }
 
@@ -310,7 +396,7 @@
     rd.setBinary(in, b.length);
 
     String path = testDir.getAbsolutePath()+sep+"tmp"+sep+"rd-index";
-    try (LuceneClient client = new LuceneClient(new File(path).toPath())) {
+    try (LuceneClient client = new LuceneClient(path)) {
       LuceneDocument doc = new LuceneDocument();
 
       doc = LuceneDocument.addField(doc, client.idField(), documentURI, client.fieldsInfo());
@@ -339,6 +425,92 @@
       assertThat(searcher.count(client.newQuery("content:categorization")), is(1));
       assertThat(searcher.count(client.newQuery("content:tagging")), is(1));
       assertThat(searcher.count(client.newQuery("content:(classification AND lucene)")), is(1));
+      searcher.getIndexReader().close();
+    }
+  }
+
+  @Test
+  public void testHdfsSimple1() throws IOException {
+    try (LuceneClient client = new LuceneClient(hdfsPath+"/Simple1")) {
+      for (int i = 0; i < 10; i++) {
+        LuceneDocument doc = new LuceneDocument();
+        doc = LuceneDocument.addField(doc, ID, String.valueOf(i), client.fieldsInfo());
+        doc = LuceneDocument.addField(doc, CONTENT, ByteSource.wrap("hdfs directory?".getBytes(StandardCharsets.UTF_8)).openBufferedStream(), client.fieldsInfo());
+        client.addOrReplace(String.valueOf(i), doc);
+      }
+      client.refresh();
+      assertThat(client.newRealtimeSearcher().count(client.newQuery("content:hdfs")), is(10));
+
+      client.remove(String.valueOf(0));
+      client.refresh();
+      assertThat(client.newRealtimeSearcher().count(client.newQuery("content:hdfs")), is(9));
+    }
+  }
+
+  @Test
+  public void testHdfsSimple2() throws IOException {
+    try (LuceneClient client = new LuceneClient(hdfsPath+"/Simple2")) {
+      for (int i = 0; i < 10; i++) {
+        LuceneDocument doc = new LuceneDocument();
+        doc = LuceneDocument.addField(doc, ID, String.valueOf(i), client.fieldsInfo());
+        doc = LuceneDocument.addField(doc, CONTENT, ByteSource.wrap("hdfs directory.".getBytes(StandardCharsets.UTF_8)).openBufferedStream(), client.fieldsInfo());
+        client.addOrReplace(String.valueOf(i), doc);
+      }
+      client.optimize();
+      IndexSearcher searcher = client.newSearcher();
+      assertThat(searcher.count(client.newQuery("content:hdfs")), is(10));
+      searcher.getIndexReader().close();
+
+      client.remove(String.valueOf(0));
+      client.optimize();
+      IndexSearcher searcher2 = client.newSearcher();
+      assertThat(searcher2.count(client.newQuery("content:hdfs")), is(9));
+      searcher2.getIndexReader().close();
+    }
+  }
+
+  @Test
+  public void testHdfsLock() throws IOException {
+    String samePath = testDir.getAbsolutePath()+sep+"tmp"+sep+"lock";
+    try {
+      try (LuceneClient client1 = new LuceneClient(samePath);
+           LuceneClient client2 = new LuceneClient(samePath)) {
+        fail("Should not get here");
+      }
+    } catch (Exception e) {
+      assert e instanceof LockObtainFailedException;
+    }
+
+    samePath = hdfsPath+"/lock";
+    String processID_A = "/A";
+    String processID_B = "/B";
+    try {
+      try (LuceneClient client1 = new LuceneClient(samePath+processID_A);
+           LuceneClient client2 = new LuceneClient(samePath+processID_B)) {
+        for (int i = 0; i < 2; i++) {
+          LuceneDocument doc1 = new LuceneDocument();
+          doc1 = LuceneDocument.addField(doc1, ID, "A"+String.valueOf(i), client1.fieldsInfo());
+          client1.addOrReplace("A"+String.valueOf(i), doc1);
+        }
+        client1.commit();
+        for (int i = 0; i < 3; i++) {
+          LuceneDocument doc2 = new LuceneDocument();
+          doc2 = LuceneDocument.addField(doc2, ID, "B"+String.valueOf(i), client2.fieldsInfo());
+          client2.addOrReplace("B"+String.valueOf(i), doc2);
+        }
+        client2.commit();
+
+        IndexSearcher searcher1 = client1.newSearcher();
+        assertThat(searcher1.count(client1.newQuery("*:*")), is(2));
+        searcher1.getIndexReader().close();
+
+        IndexSearcher searcher2 = client2.newSearcher();
+        assertThat(searcher2.count(client2.newQuery("*:*")), is(3));
+        searcher2.getIndexReader().close();
+      }
+    } catch (Exception e) {
+      System.out.println(e.getMessage());
+      fail("Should not get here");
     }
   }
 
diff --git a/connectors/lucene/hadoop-minicluster.xml b/connectors/lucene/hadoop-minicluster.xml
new file mode 100644
index 0000000..f2a81bd
--- /dev/null
+++ b/connectors/lucene/hadoop-minicluster.xml
@@ -0,0 +1,33 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <groupId>org.apache.manifoldcf</groupId>
+  <artifactId>mcf-lucene-connector-hadoop-minicluster</artifactId>
+  <version>2.2-SNAPSHOT</version>
+  <modelVersion>4.0.0</modelVersion>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>2.6.0</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/connectors/lucene/pom.xml b/connectors/lucene/pom.xml
index ddb0c33..5274819 100644
--- a/connectors/lucene/pom.xml
+++ b/connectors/lucene/pom.xml
@@ -135,6 +135,9 @@
           </excludes>
           <forkMode>always</forkMode>
           <workingDirectory>target/test-output</workingDirectory>
+          <systemPropertyVariables>
+            <XX:MaxDirectMemorySize>512m</XX:MaxDirectMemorySize>
+          </systemPropertyVariables>
         </configuration>
       </plugin>
 
@@ -236,6 +239,76 @@
       <artifactId>gson</artifactId>
       <version>2.2.4</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-core</artifactId>
+      <version>${lucene.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+      <artifactId>concurrentlinkedhashmap-lru</artifactId>
+      <version>1.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-solrj</artifactId>
+      <version>${lucene.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.htrace</groupId>
+      <artifactId>htrace-core</artifactId>
+      <version>3.0.4</version>
+    </dependency>
 
     <!-- Testing dependencies -->
     
@@ -376,5 +449,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 </project>