blob: d78a052da49c6457bdec4a0b4a8e43934f49736b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrIgnoredThreadsFilter;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.cloud.BasicDistributedZkTest;
import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.HdfsDirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.store.blockcache.BlockCache;
import org.apache.solr.store.blockcache.BlockDirectory;
import org.apache.solr.store.blockcache.BlockDirectoryCache;
import org.apache.solr.store.blockcache.Cache;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.apache.solr.util.RefCounted;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@Slow
@Nightly
@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
QuickPatchThreadsFilter.class,
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
})
public class HdfsWriteToMultipleCollectionsTest extends BasicDistributedZkTest {
private static final String ACOLLECTION = "acollection";
private static MiniDFSCluster dfsCluster;
@BeforeClass
public static void setupClass() throws Exception {
schemaString = "schema15.xml"; // we need a string id
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
}
@AfterClass
public static void teardownClass() throws Exception {
try {
HdfsTestUtil.teardownClass(dfsCluster);
} finally {
dfsCluster = null;
schemaString = null;
}
}
@Override
protected String getDataDir(String dataDir) throws IOException {
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
}
public HdfsWriteToMultipleCollectionsTest() {
super();
sliceCount = 1;
fixShardCount(3);
}
protected String getSolrXml() {
return "solr.xml";
}
@Test
public void test() throws Exception {
int docCount = random().nextInt(1313) + 1;
int cnt = random().nextInt(4) + 1;
for (int i = 0; i < cnt; i++) {
createCollection(ACOLLECTION + i, "conf1", 2, 2, 9);
}
for (int i = 0; i < cnt; i++) {
waitForRecoveriesToFinish(ACOLLECTION + i, false);
}
List<CloudSolrClient> cloudClients = new ArrayList<>();
List<StoppableIndexingThread> threads = new ArrayList<>();
for (int i = 0; i < cnt; i++) {
CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress());
client.setDefaultCollection(ACOLLECTION + i);
cloudClients.add(client);
StoppableIndexingThread indexThread = new StoppableIndexingThread(null, client, "1", true, docCount, 1, true);
threads.add(indexThread);
indexThread.start();
}
int addCnt = 0;
for (StoppableIndexingThread thread : threads) {
thread.join();
addCnt += thread.getNumAdds() - thread.getNumDeletes();
}
long collectionsCount = 0;
for (CloudSolrClient client : cloudClients) {
client.commit();
collectionsCount += client.query(new SolrQuery("*:*")).getResults().getNumFound();
}
IOUtils.close(cloudClients);
assertEquals(addCnt, collectionsCount);
BlockCache lastBlockCache = null;
// assert that we are using the block directory and that write and read caching are being used
for (JettySolrRunner jetty : jettys) {
CoreContainer cores = jetty.getCoreContainer();
Collection<SolrCore> solrCores = cores.getCores();
for (SolrCore core : solrCores) {
if (core.getCoreDescriptor().getCloudDescriptor().getCollectionName()
.startsWith(ACOLLECTION)) {
DirectoryFactory factory = core.getDirectoryFactory();
assertTrue("Found: " + core.getDirectoryFactory().getClass().getName(), factory instanceof HdfsDirectoryFactory);
Directory dir = factory.get(core.getDataDir(), null, null);
try {
long dataDirSize = factory.size(dir);
Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
FileSystem fileSystem = FileSystem.newInstance(
new Path(core.getDataDir()).toUri(), conf);
long size = fileSystem.getContentSummary(
new Path(core.getDataDir())).getLength();
assertEquals(size, dataDirSize);
} finally {
core.getDirectoryFactory().release(dir);
}
RefCounted<IndexWriter> iwRef = core.getUpdateHandler()
.getSolrCoreState().getIndexWriter(core);
try {
IndexWriter iw = iwRef.get();
NRTCachingDirectory directory = (NRTCachingDirectory) iw.getDirectory();
BlockDirectory blockDirectory = (BlockDirectory) directory
.getDelegate();
assertTrue(blockDirectory.isBlockCacheReadEnabled());
// see SOLR-6424
assertFalse(blockDirectory.isBlockCacheWriteEnabled());
Cache cache = blockDirectory.getCache();
// we know it's a BlockDirectoryCache, but future proof
assertTrue(cache instanceof BlockDirectoryCache);
BlockCache blockCache = ((BlockDirectoryCache) cache)
.getBlockCache();
if (lastBlockCache != null) {
if (Boolean.getBoolean("solr.hdfs.blockcache.global")) {
assertEquals(lastBlockCache, blockCache);
} else {
assertNotSame(lastBlockCache, blockCache);
}
}
lastBlockCache = blockCache;
} finally {
iwRef.decref();
}
}
}
}
}
}