blob: 4ac480b11a99fa6fb2f0ab35248728bb1878025e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling.sim;
import java.nio.charset.Charset;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.junit.After;
import org.junit.Test;
/**
*
*/
public class TestSimDistributedQueue extends SolrTestCaseJ4 {
private static final Charset UTF8 = Charset.forName("UTF-8");
protected ExecutorService executor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("sdqtest-"));
@Test
// commented 20-July-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
public void testDistributedQueue() throws Exception {
String dqZNode = "/distqueue/test1";
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue dq = makeDistributedQueue(dqZNode);
// basic ops
assertNull(dq.poll());
try {
dq.remove();
fail("NoSuchElementException expected");
} catch (NoSuchElementException expected) {
// expected
}
dq.offer(data);
assertArrayEquals(dq.peek(500), data);
assertArrayEquals(dq.remove(), data);
assertNull(dq.poll());
dq.offer(data);
assertArrayEquals(dq.take(), data); // waits for data
assertNull(dq.poll());
dq.offer(data);
dq.peek(15000); // wait until data is definitely there before calling remove
assertArrayEquals(dq.remove(), data);
assertNull(dq.poll());
// should block until the background thread makes the offer
(new QueueChangerThread(dq, 1000)).start();
assertNotNull(dq.peek(15000));
assertNotNull(dq.remove());
assertNull(dq.poll());
// timeout scenario ... background thread won't offer until long after the peek times out
QueueChangerThread qct = new QueueChangerThread(dq, 1000);
qct.start();
assertNull(dq.peek(500));
qct.join();
}
@Test
public void testDistributedQueueBlocking() throws Exception {
String dqZNode = "/distqueue/test2";
String testData = "hello world";
DistributedQueue dq = makeDistributedQueue(dqZNode);
assertNull(dq.peek());
Future<String> future = executor.submit(() -> new String(dq.peek(15000), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
assertNotNull(dq.poll());
assertNull(dq.peek(100));
// Rerun the earlier test make sure updates are still seen, post reconnection.
future = executor.submit(() -> new String(dq.peek(15000), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
} catch (TimeoutException expected) {
assertFalse(future.isDone());
}
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
assertNotNull(dq.poll());
assertNull(dq.poll());
}
@Test
public void testLocallyOffer() throws Exception {
String dqZNode = "/distqueue/test3";
DistributedQueue dq = makeDistributedQueue(dqZNode);
dq.peekElements(1, 1, s -> true);
for (int i = 0; i < 100; i++) {
byte[] data = String.valueOf(i).getBytes(UTF8);
dq.offer(data);
assertNotNull(dq.peek());
dq.poll();
dq.peekElements(1, 1, s -> true);
}
}
@Test
public void testPeekElements() throws Exception {
String dqZNode = "/distqueue/test4";
byte[] data = "hello world".getBytes(UTF8);
DistributedQueue dq = makeDistributedQueue(dqZNode);
// Populate with data.
dq.offer(data);
dq.offer(data);
dq.offer(data);
Predicate<String> alwaysTrue = s -> true;
Predicate<String> alwaysFalse = s -> false;
// Should be able to get 0, 1, 2, or 3 instantly
for (int i = 0; i <= 3; ++i) {
assertEquals(i, dq.peekElements(i, 0, alwaysTrue).size());
}
// Asking for more should return only 3.
assertEquals(3, dq.peekElements(4, 0, alwaysTrue).size());
// If we filter everything out, we should block for the full time.
long start = System.nanoTime();
assertEquals(0, dq.peekElements(4, 1000, alwaysFalse).size());
assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
// If someone adds a new matching element while we're waiting, we should return immediately.
executor.submit(() -> {
try {
Thread.sleep(500);
dq.offer(data);
} catch (Exception e) {
// ignore
}
});
start = System.nanoTime();
assertEquals(1, dq.peekElements(4, 2000, child -> {
// The 4th element in the queue will end with a "3".
return child.endsWith("3");
}).size());
long elapsed = System.nanoTime() - start;
assertTrue(elapsed < TimeUnit.MILLISECONDS.toNanos(1000));
assertTrue(elapsed >= TimeUnit.MILLISECONDS.toNanos(250));
}
protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
return new SimDistributedQueueFactory.SimDistributedQueue(dqZNode);
}
private static class QueueChangerThread extends Thread {
DistributedQueue dq;
long waitBeforeOfferMs;
QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
this.dq = dq;
this.waitBeforeOfferMs = waitBeforeOfferMs;
}
public void run() {
try {
Thread.sleep(waitBeforeOfferMs);
dq.offer(getName().getBytes(UTF8));
} catch (InterruptedException ie) {
// do nothing
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}
}
@Override
@After
public void tearDown() throws Exception {
try {
super.tearDown();
} catch (Exception exc) {
}
ExecutorUtil.shutdownAndAwaitTermination(executor);
}
}