blob: 7929ed674c4f7902d9f01561aa898bb7bf11fa64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.nio.charset.Charset;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class DistributedQueueTest extends SolrTestCaseJ4 {
private static final Charset UTF8 = Charset.forName("UTF-8");
protected ZkTestServer zkServer;
protected SolrZkClient zkClient;
protected ExecutorService executor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("dqtest-"));
@Before
@Override
public void setUp() throws Exception {
super.setUp();
setupZk();
}
@Test
public void testDistributedQueue() throws Exception {
String dqZNode = "/distqueue/test";
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue dq = makeDistributedQueue(dqZNode);
// basic ops
assertNull(dq.poll());
try {
dq.remove();
fail("NoSuchElementException expected");
} catch (NoSuchElementException expected) {
// expected
}
dq.offer(data);
assertArrayEquals(dq.peek(500), data);
assertArrayEquals(dq.remove(), data);
assertNull(dq.poll());
dq.offer(data);
assertArrayEquals(dq.take(), data); // waits for data
assertNull(dq.poll());
dq.offer(data);
dq.peek(true); // wait until data is definitely there before calling remove
assertArrayEquals(dq.remove(), data);
assertNull(dq.poll());
// should block until the background thread makes the offer
(new QueueChangerThread(dq, 1000)).start();
assertNotNull(dq.peek(true));
assertNotNull(dq.remove());
assertNull(dq.poll());
// timeout scenario ... background thread won't offer until long after the peek times out
QueueChangerThread qct = new QueueChangerThread(dq, 1000);
qct.start();
assertNull(dq.peek(500));
qct.join();
}
@Test
public void testDistributedQueueCache() throws Exception {
String dqZNode = "/distqueue/test";
byte[] data = "hello world".getBytes(UTF8);
ZkDistributedQueue consumer = makeDistributedQueue(dqZNode);
DistributedQueue producer = makeDistributedQueue(dqZNode);
DistributedQueue producer2 = makeDistributedQueue(dqZNode);
producer2.offer(data);
producer.offer(data);
producer.offer(data);
consumer.poll();
assertEquals(2, consumer.getZkStats().getQueueLength());
producer.offer(data);
producer2.offer(data);
consumer.poll();
// Wait for watcher being kicked off
while (!consumer.isDirty()) {
Thread.sleep(20);
}
// DQ still have elements in their queue, so we should not fetch elements path from Zk
assertEquals(1, consumer.getZkStats().getQueueLength());
consumer.poll();
consumer.peek();
assertEquals(2, consumer.getZkStats().getQueueLength());
}
@Test
public void testDistributedQueueBlocking() throws Exception {
String dqZNode = "/distqueue/test";
String testData = "hello world";
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertNull(dq.peek());
Future<String> future = executor.submit(() -> new String(dq.peek(true), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
// Ultimately trips the watcher, triggering child refresh
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
assertNotNull(dq.poll());
// After draining the queue, a watcher should be set.
assertNull(dq.peek(100));
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeout.waitFor("Timeout waiting to see dirty=false", () -> {
try {
return !dq.isDirty();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assertFalse(dq.isDirty());
assertEquals(1, dq.watcherCount());
forceSessionExpire();
// Session expiry should have fired the watcher.
Thread.sleep(100);
assertTrue(dq.isDirty());
assertEquals(0, dq.watcherCount());
// Rerun the earlier test make sure updates are still seen, post reconnection.
future = executor.submit(() -> new String(dq.peek(true), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
// Ultimately trips the watcher, triggering child refresh
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
assertNotNull(dq.poll());
assertNull(dq.poll());
}
@Test
public void testLeakChildWatcher() throws Exception {
String dqZNode = "/distqueue/test";
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertNull(dq.peek());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertNull(dq.peek(10));
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
dq.offer("hello world".getBytes(UTF8));
assertNotNull(dq.peek()); // synchronously available
// dirty and watcher state indeterminate here, race with watcher
Thread.sleep(100); // watcher should have fired now
assertNotNull(dq.peek());
// in case of race condition, childWatcher is kicked off after peek()
if (dq.watcherCount() == 0) {
assertTrue(dq.isDirty());
dq.poll();
dq.offer("hello world".getBytes(UTF8));
dq.peek();
}
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
}
@Test
public void testLocallyOffer() throws Exception {
String dqZNode = "/distqueue/test";
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
dq.peekElements(1, 1, s -> true);
for (int i = 0; i < 100; i++) {
byte[] data = String.valueOf(i).getBytes(UTF8);
dq.offer(data);
assertNotNull(dq.peek());
dq.poll();
dq.peekElements(1, 1, s -> true);
}
}
@Test
public void testPeekElements() throws Exception {
String dqZNode = "/distqueue/test";
byte[] data = "hello world".getBytes(UTF8);
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
// Populate with data.
dq.offer(data);
dq.offer(data);
dq.offer(data);
Predicate<String> alwaysTrue = s -> true;
Predicate<String> alwaysFalse = s -> false;
// Should be able to get 0, 1, 2, or 3 instantly
for (int i = 0; i <= 3; ++i) {
assertEquals(i, dq.peekElements(i, 0, alwaysTrue).size());
}
// Asking for more should return only 3.
assertEquals(3, dq.peekElements(4, 0, alwaysTrue).size());
// If we filter everything out, we should block for the full time.
long start = System.nanoTime();
assertEquals(0, dq.peekElements(4, 1000, alwaysFalse).size());
assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
// If someone adds a new matching element while we're waiting, we should return immediately.
executor.submit(() -> {
try {
Thread.sleep(500);
dq.offer(data);
} catch (Exception e) {
// ignore
}
});
start = System.nanoTime();
assertEquals(1, dq.peekElements(4, 2000, child -> {
// The 4th element in the queue will end with a "3".
return child.endsWith("3");
}).size());
long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
assertTrue("Time was " + timeTaken + "ms, expected 250-1500ms", timeTaken > 250 && timeTaken < 1500);
}
private void forceSessionExpire() throws InterruptedException, TimeoutException {
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
zkServer.expire(sessionId);
zkClient.getConnectionManager().waitForDisconnected(10000);
zkClient.getConnectionManager().waitForConnected(10000);
for (int i = 0; i < 100; ++i) {
if (zkClient.isConnected()) {
break;
}
Thread.sleep(50);
}
assertTrue(zkClient.isConnected());
assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId());
}
protected ZkDistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new ZkDistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
private static class QueueChangerThread extends Thread {
DistributedQueue dq;
long waitBeforeOfferMs;
QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
this.dq = dq;
this.waitBeforeOfferMs = waitBeforeOfferMs;
}
public void run() {
try {
Thread.sleep(waitBeforeOfferMs);
dq.offer(getName().getBytes(UTF8));
} catch (InterruptedException ie) {
// do nothing
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}
}
protected String setupNewDistributedQueueZNode(String znodePath) throws Exception {
if (!zkClient.exists("/", true))
zkClient.makePath("/", false, true);
if (zkClient.exists(znodePath, true))
zkClient.clean(znodePath);
zkClient.makePath(znodePath, false, true);
return znodePath;
}
@Override
@After
public void tearDown() throws Exception {
try {
super.tearDown();
} catch (Exception exc) {
}
closeZk();
executor.shutdown();
}
protected void setupZk() throws Exception {
System.setProperty("zkClientTimeout", "8000");
zkServer = new ZkTestServer(createTempDir("zkData"));
zkServer.run();
System.setProperty("zkHost", zkServer.getZkAddress());
zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
assertTrue(zkClient.isConnected());
}
protected void closeZk() throws Exception {
if (null != zkClient) {
zkClient.close();
zkClient = null;
}
if (null != zkServer) {
zkServer.shutdown();
zkServer = null;
}
}
}