blob: 9e3f323cedb9dd96286070a5d3f9f671c4b91df9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis.internal.executor.pubsub;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.buildobjects.process.ProcBuilder;
import org.buildobjects.process.StreamConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.mocks.MockBinarySubscriber;
import org.apache.geode.redis.mocks.MockSubscriber;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.RedisPortSupplier;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public abstract class AbstractPubSubIntegrationTest implements RedisPortSupplier {
private Jedis publisher;
private Jedis subscriber;
public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
@ClassRule
public static ExecutorServiceRule executor = new ExecutorServiceRule();
@Before
public void setUp() {
subscriber = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
publisher = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
}
@After
public void tearDown() {
subscriber.close();
publisher.close();
}
@Test
@SuppressWarnings("unchecked")
public void punsubscribe_whenNonexistent() {
assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE, "Nonexistent"))
.containsExactly("punsubscribe".getBytes(), "Nonexistent".getBytes(), 0L);
}
@Test
@SuppressWarnings("unchecked")
public void unsubscribe_whenNoSubscriptionsExist_shouldNotHang() {
assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.UNSUBSCRIBE))
.containsExactly("unsubscribe".getBytes(), null, 0L);
}
@Test
@SuppressWarnings("unchecked")
public void punsubscribe_whenNoSubscriptionsExist_shouldNotHang() {
assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE))
.containsExactly("punsubscribe".getBytes(), null, 0L);
}
@Test
public void testOneSubscriberOneChannel() {
List<String> expectedMessages = Arrays.asList("hello");
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
}
@Test
public void punsubscribe_givenSubscribe_doesNotReduceSubscriptions() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
try {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.punsubscribe("salutations");
waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1);
assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("salutations");
assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
} finally {
// now cleanup the actual subscription
mockSubscriber.unsubscribe("salutations");
waitFor(() -> !subscriberThread.isAlive());
}
}
@Test
public void unsubscribe_givenPsubscribe_doesNotReduceSubscriptions() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.psubscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
try {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
} finally {
// now cleanup the actual subscription
mockSubscriber.punsubscribe("salutations");
waitFor(() -> !subscriberThread.isAlive());
}
}
@Test
public void unsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
try {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.unsubscribe("NonExistent");
waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("NonExistent");
assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
} finally {
// now cleanup the actual subscription
mockSubscriber.unsubscribe("salutations");
waitFor(() -> !subscriberThread.isAlive());
}
}
@Test
public void unsubscribe_whenGivenAnEmptyString() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
try {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.unsubscribe("");
waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("");
assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
} finally {
// now cleanup the actual subscription
mockSubscriber.unsubscribe();
waitFor(() -> !subscriberThread.isAlive());
}
}
@Test
public void unsubscribeWithEmptyChannel_doesNotUnsubscribeExistingChannels() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
try {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.unsubscribe("");
waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
Long result = publisher.publish("salutations", "heyho");
waitFor(() -> mockSubscriber.getReceivedMessages().size() == 1);
assertThat(result).isEqualTo(1);
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo("heyho");
} finally {
// now cleanup the actual subscription
mockSubscriber.unsubscribe();
waitFor(() -> !subscriberThread.isAlive());
}
}
@Test
public void canSubscribeToAnEmptyString() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish("", "blank");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).containsExactly("blank");
}
@Test
public void punsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.psubscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
try {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.punsubscribe("NonExistent");
waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1);
assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("NonExistent");
assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
} finally {
// now cleanup the actual subscription
mockSubscriber.punsubscribe("salutations");
waitFor(() -> !subscriberThread.isAlive());
}
}
@Test
public void testPublishBinaryData() {
byte[] expectedMessage = new byte[256];
for (int i = 0; i < 256; i++) {
expectedMessage[i] = (byte) i;
}
MockBinarySubscriber mockSubscriber = new MockBinarySubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations".getBytes());
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations".getBytes(), expectedMessage);
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations".getBytes());
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(expectedMessage);
}
@Test
public void testSubscribeAndPublishUsingBinaryData() {
byte[] binaryBlob = new byte[256];
for (int i = 0; i < 256; i++) {
binaryBlob[i] = (byte) i;
}
MockBinarySubscriber mockSubscriber = new MockBinarySubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, binaryBlob);
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish(binaryBlob, binaryBlob);
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe(binaryBlob);
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob);
}
@Test
public void testOneSubscriberSubscribingToTwoChannels() {
List<String> expectedMessages = Arrays.asList("hello", "howdy");
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations", "yuletide");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(1);
result = publisher.publish("yuletide", "howdy");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
mockSubscriber.unsubscribeInfos.clear();
mockSubscriber.unsubscribe("yuletide");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("yuletide");
assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
}
@Test
public void testSubscribingAndUnsubscribingFromMultipleChannels() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations", "yuletide");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
mockSubscriber.unsubscribe("yuletide", "salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
.map(x -> x.channel)
.collect(Collectors.toList());
assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
.map(x -> x.count)
.collect(Collectors.toList());
assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
}
@Test
public void testPsubscribingAndPunsubscribingFromMultipleChannels() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", "yul*");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
mockSubscriber.punsubscribe("yul*", "sal*");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
new MockSubscriber.UnsubscribeInfo("yul*", 1),
new MockSubscriber.UnsubscribeInfo("sal*", 0));
}
@Test
public void testPunsubscribingImplicitlyFromAllChannels() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", "yul*");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
mockSubscriber.punsubscribe();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
new MockSubscriber.UnsubscribeInfo("sal*", 1),
new MockSubscriber.UnsubscribeInfo("yul*", 0));
}
@Test
public void ensureOrderingOfPublishedMessages() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Future<Void> future1 =
executor.submit(() -> runSubscribeAndPublish(1, 10000, running));
running.set(false);
future1.get();
}
private void runSubscribeAndPublish(int index, int minimumIterations, AtomicBoolean running)
throws Exception {
int iterationCount = 0;
Jedis publisher = getConnection();
while (iterationCount < minimumIterations || running.get()) {
List<String> result = subscribeAndPublish(index, iterationCount, publisher);
assertThat(result)
.as("Failed at iteration " + iterationCount)
.containsExactly("message", "pmessage");
iterationCount++;
}
publisher.close();
}
private List<String> subscribeAndPublish(int index, int iteration, Jedis localPublisher)
throws Exception {
String channel = index + ".foo.bar";
String pChannel = index + ".foo.*";
MockSubscriber mockSubscriber = new MockSubscriber();
try (Jedis localSubscriber = getConnection()) {
Future<Void> future =
executor.submit(() -> localSubscriber.subscribe(mockSubscriber, channel));
mockSubscriber.awaitSubscribe(channel);
mockSubscriber.psubscribe(pChannel);
mockSubscriber.awaitPSubscribe(pChannel);
localPublisher.publish(channel, "hello-" + index + "-" + iteration);
mockSubscriber.unsubscribe(channel);
mockSubscriber.awaitUnsubscribe(channel);
mockSubscriber.punsubscribe(pChannel);
mockSubscriber.awaitPunsubscribe(pChannel);
future.get();
}
return mockSubscriber.getReceivedEvents();
}
@Test
public void testTwoSubscribersOneChannel() {
Jedis subscriber2 = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
MockSubscriber mockSubscriber1 = new MockSubscriber();
MockSubscriber mockSubscriber2 = new MockSubscriber();
Runnable runnable1 = () -> subscriber.subscribe(mockSubscriber1, "salutations");
Runnable runnable2 = () -> subscriber2.subscribe(mockSubscriber2, "salutations");
Thread subscriber1Thread = new Thread(runnable1);
subscriber1Thread.start();
Thread subscriber2Thread = new Thread(runnable2);
subscriber2Thread.start();
waitFor(() -> mockSubscriber1.getSubscribedChannels() == 1);
waitFor(() -> mockSubscriber2.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(2);
mockSubscriber1.unsubscribe("salutations");
waitFor(() -> mockSubscriber1.getSubscribedChannels() == 0);
waitFor(() -> !subscriber1Thread.isAlive());
assertThat(mockSubscriber1.unsubscribeInfos).hasSize(1);
assertThat(mockSubscriber1.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
assertThat(mockSubscriber1.unsubscribeInfos.get(0).count).isEqualTo(0);
result = publisher.publish("salutations", "goodbye");
assertThat(result).isEqualTo(1);
mockSubscriber2.unsubscribe("salutations");
waitFor(() -> mockSubscriber2.getSubscribedChannels() == 0);
waitFor(() -> !subscriber2Thread.isAlive());
assertThat(mockSubscriber2.unsubscribeInfos).hasSize(1);
assertThat(mockSubscriber2.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
assertThat(mockSubscriber2.unsubscribeInfos.get(0).count).isEqualTo(0);
assertThat(mockSubscriber1.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
assertThat(mockSubscriber2.getReceivedMessages()).isEqualTo(Arrays.asList("hello", "goodbye"));
subscriber2.close();
}
@Test
public void testPublishToNonexistentChannel() {
Long result = publisher.publish("thisChannelDoesn'tExist", "hello");
assertThat(result).isEqualTo(0);
}
@Test
public void testOneSubscriberOneChannelTwoTimes() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable1 = () -> subscriber.subscribe(mockSubscriber, "salutations", "salutations");
Thread subscriberThread = new Thread(runnable1);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.unsubscribeInfos)
.containsExactly(new MockSubscriber.UnsubscribeInfo("salutations", 0));
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
}
@Test
public void testDeadSubscriber() {
Jedis deadSubscriber = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
deadSubscriber.subscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
deadSubscriber.close();
waitFor(() -> !deadSubscriber.isConnected());
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(0);
assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
}
@Test
public void testPatternSubscribe() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.psubscribe(mockSubscriber, "sal*s");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
String message = "hello-" + System.currentTimeMillis();
Long result = publisher.publish("salutations", message);
assertThat(result).isEqualTo(1);
assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
GeodeAwaitility.await().until(() -> !mockSubscriber.getReceivedPMessages().isEmpty());
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly(message);
mockSubscriber.punsubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.punsubscribeInfos)
.containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0));
}
@Test
public void testSubscribeToSamePattern() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.psubscribe(mockSubscriber, "sal*s");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.psubscribe("sal*s");
GeodeAwaitility.await()
.during(5, TimeUnit.SECONDS)
.until(() -> mockSubscriber.getSubscribedChannels() == 1);
String message = "hello-" + System.currentTimeMillis();
Long result = publisher.publish("salutations", message);
assertThat(result).isEqualTo(1);
assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
GeodeAwaitility.await().until(() -> !mockSubscriber.getReceivedPMessages().isEmpty());
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly(message);
mockSubscriber.punsubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.punsubscribeInfos)
.containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0));
}
@Test
public void testPatternAndRegularSubscribe() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.psubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(2);
mockSubscriber.punsubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
assertThat(mockSubscriber.punsubscribeInfos).hasSize(1);
assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("sal*s");
assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0);
assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello");
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
}
@Test
public void testPatternWithoutAGlob() {
MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
};
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.psubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(2);
mockSubscriber.punsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello");
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
}
private Jedis getConnection() {
Exception lastException = null;
for (int i = 0; i < 10; i++) {
Jedis client = null;
try {
client = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
client.ping();
return client;
} catch (Exception e) {
lastException = e;
if (client != null) {
try {
client.close();
} catch (Exception ignore) {
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
runit("docker", "ps", "-a");
throw new RuntimeException("Tried 10 times, but could not get a good connection.",
lastException);
}
// TODO: This exists for debugging some flakiness in this test. If this is fully resolved in the
// future, please delete this and the associated dependency in build.gradle
private void runit(String command, String... args) {
StreamConsumer consumer = stream -> {
InputStreamReader inputStreamReader = new InputStreamReader(stream);
BufferedReader bufReader = new BufferedReader(inputStreamReader);
String line;
while ((line = bufReader.readLine()) != null) {
System.err.println("::::: " + line);
}
};
new ProcBuilder(command)
.withArgs(args)
.withOutputConsumer(consumer)
.withTimeoutMillis(60000)
.run();
}
int doPublishing(int index, int minimumIterations, AtomicBoolean running) {
int iterationCount = 0;
int publishedMessages = 0;
Jedis client = getConnection();
try {
while (iterationCount < minimumIterations || running.get()) {
publishedMessages += client.publish("my-channel", "boo-" + index + "-" + iterationCount);
iterationCount++;
}
} finally {
client.close();
}
return publishedMessages;
}
int makeSubscribers(int minimumIterations, AtomicBoolean running)
throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(100);
Queue<Future<Void>> workQ = new ConcurrentLinkedQueue<>();
Future<Integer> consumer = executor.submit(() -> {
int subscribersProcessed = 0;
while (subscribersProcessed < minimumIterations || running.get()) {
if (workQ.isEmpty()) {
Thread.yield();
continue;
}
Future<Void> f = workQ.poll();
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
subscribersProcessed++;
}
return subscribersProcessed;
});
int iterationCount = 0;
String channel = "my-channel";
while (iterationCount < minimumIterations || running.get()) {
Future<Void> f = executor.submit(() -> {
Jedis client = getConnection();
ExecutorService secondaryExecutor = Executors.newSingleThreadExecutor();
MockSubscriber mockSubscriber = new MockSubscriber();
AtomicReference<Thread> innerThread = new AtomicReference<>();
Future<Void> inner = secondaryExecutor.submit(() -> {
innerThread.set(Thread.currentThread());
client.subscribe(mockSubscriber, channel);
return null;
});
mockSubscriber.awaitSubscribe(channel);
if (inner.isDone()) {
throw new RuntimeException("inner completed before unsubscribe");
}
mockSubscriber.unsubscribe(channel);
try {
inner.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
LogService.getLogger().debug("=> {} {}", innerThread.get(), innerThread.get().getState());
for (StackTraceElement st : innerThread.get().getStackTrace()) {
LogService.getLogger().debug("-> {}", st);
}
throw new RuntimeException("inner.get() errored after unsubscribe: " + e.getMessage());
}
mockSubscriber.awaitUnsubscribe(channel);
client.close();
secondaryExecutor.shutdownNow();
return null;
});
workQ.add(f);
iterationCount++;
}
int result = consumer.get();
executor.shutdownNow();
return result;
}
@Test
public void concurrentSubscribers_andPublishers_doesNotHang()
throws InterruptedException, ExecutionException {
AtomicBoolean running = new AtomicBoolean(true);
Future<Integer> makeSubscribersFuture1 =
executor.submit(() -> makeSubscribers(10000, running));
Future<Integer> makeSubscribersFuture2 =
executor.submit(() -> makeSubscribers(10000, running));
Future<Integer> publish1 = executor.submit(() -> doPublishing(1, 10000, running));
Future<Integer> publish2 = executor.submit(() -> doPublishing(2, 10000, running));
Future<Integer> publish3 = executor.submit(() -> doPublishing(3, 10000, running));
Future<Integer> publish4 = executor.submit(() -> doPublishing(4, 10000, running));
Future<Integer> publish5 = executor.submit(() -> doPublishing(5, 10000, running));
running.set(false);
assertThat(makeSubscribersFuture1.get()).isGreaterThanOrEqualTo(10);
assertThat(makeSubscribersFuture2.get()).isGreaterThanOrEqualTo(10);
assertThat(publish1.get()).isGreaterThan(0);
assertThat(publish2.get()).isGreaterThan(0);
assertThat(publish3.get()).isGreaterThan(0);
assertThat(publish4.get()).isGreaterThan(0);
assertThat(publish5.get()).isGreaterThan(0);
}
private void waitFor(Callable<Boolean> booleanCallable) {
GeodeAwaitility.await()
.ignoreExceptions() // ignoring socket closed exceptions
.until(booleanCallable);
}
}