Delay setting wait time
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index aabdd9b..91d7ba3 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -62,7 +62,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -376,7 +375,7 @@
if (null != session) {
return session;
}
- session = new ClientSessionImpl(this, endpoints);
+ session = new ClientSessionImpl(this, clientConfiguration.getRequestTimeout(), endpoints);
sessionsTable.put(endpoints, session);
return session;
} finally {
@@ -388,7 +387,7 @@
* Triggered when {@link TopicRouteData} is fetched from remote.
*/
public void onTopicRouteDataFetched(String topic, TopicRouteData topicRouteData) throws ClientException,
- ExecutionException, InterruptedException, TimeoutException {
+ ExecutionException, InterruptedException {
final Set<Endpoints> routeEndpoints = topicRouteData
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index ba9a580..2d69151 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -22,12 +22,12 @@
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.misc.ClientId;
@@ -41,17 +41,21 @@
public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY = Duration.ofSeconds(1);
private static final Logger log = LoggerFactory.getLogger(ClientSessionImpl.class);
- private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
+ private static final Duration SETTINGS_INITIALIZATION_TIMEOUT = Duration.ofSeconds(3);
private final ClientSessionHandler sessionHandler;
private final Endpoints endpoints;
private final SettableFuture<Settings> settingsSettableFuture;
private volatile StreamObserver<TelemetryCommand> requestObserver;
- protected ClientSessionImpl(ClientSessionHandler sessionHandler, Endpoints endpoints) throws ClientException {
+ @SuppressWarnings("UnstableApiUsage")
+ protected ClientSessionImpl(ClientSessionHandler sessionHandler, Duration tolerance, Endpoints endpoints)
+ throws ClientException {
this.sessionHandler = sessionHandler;
this.endpoints = endpoints;
this.settingsSettableFuture = SettableFuture.create();
+ Futures.withTimeout(settingsSettableFuture, SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
+ TimeUnit.MILLISECONDS, sessionHandler.getScheduler());
this.requestObserver = sessionHandler.telemetry(endpoints, this);
}
@@ -77,9 +81,9 @@
syncSettings0();
}
- protected void syncSettings() throws TimeoutException, ExecutionException, InterruptedException {
+ protected void syncSettings() throws ExecutionException, InterruptedException {
this.syncSettings0();
- settingsSettableFuture.get(SETTINGS_INITIALIZATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ settingsSettableFuture.get();
}
private void syncSettings0() {
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
index e693513..2c6b980 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -29,11 +29,12 @@
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
import io.grpc.stub.StreamObserver;
+import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -46,13 +47,14 @@
@SuppressWarnings("unchecked")
@Test
- public void syncSettings() throws ClientException, ExecutionException, InterruptedException, TimeoutException {
+ public void syncSettings() throws ClientException, ExecutionException, InterruptedException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doNothing().when(requestObserver).onNext(any(TelemetryCommand.class));
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class), any(Settings.class));
@@ -70,10 +72,11 @@
public void testOnNextWithRecoverOrphanedTransactionCommand() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
any(RecoverOrphanedTransactionCommand.class));
@@ -89,10 +92,11 @@
public void testOnNextWithVerifyMessageCommand() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doNothing().when(sessionHandler).onVerifyMessageCommand(any(Endpoints.class),
any(VerifyMessageCommand.class));
@@ -108,10 +112,11 @@
public void testOnNextWithPrintThreadStackTraceCommand() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doNothing().when(sessionHandler).onPrintThreadStackTraceCommand(any(Endpoints.class),
any(PrintThreadStackTraceCommand.class));
@@ -127,10 +132,11 @@
public void testOnNextWithUnrecognizedCommand() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class), any(Settings.class));
Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
@@ -155,11 +161,12 @@
public void testOnError() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
Mockito.doNothing().when(requestObserver).onCompleted();
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doReturn(true).when(sessionHandler).isRunning();
Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
@@ -167,7 +174,7 @@
clientSession.onError(e);
Mockito.verify(sessionHandler, times(1)).isRunning();
Mockito.verify(requestObserver, times(1)).onCompleted();
- Mockito.verify(sessionHandler, times(1)).getScheduler();
+ Mockito.verify(sessionHandler, times(2)).getScheduler();
}
@Test
@@ -175,18 +182,19 @@
public void testOnErrorWithSessionHandlerIsNotRunning() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
Mockito.doNothing().when(requestObserver).onCompleted();
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doReturn(false).when(sessionHandler).isRunning();
final Exception e = new Exception();
clientSession.onError(e);
Mockito.verify(sessionHandler, times(1)).isRunning();
Mockito.verify(requestObserver, times(1)).onCompleted();
- Mockito.verify(sessionHandler, never()).getScheduler();
+ Mockito.verify(sessionHandler, times(1)).getScheduler();
}
@Test
@@ -194,11 +202,12 @@
public void testOnCompleted() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
Mockito.doNothing().when(requestObserver).onCompleted();
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doReturn(true).when(sessionHandler).isRunning();
Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
@@ -206,7 +215,7 @@
clientSession.onCompleted();
Mockito.verify(sessionHandler, times(1)).isRunning();
Mockito.verify(requestObserver, times(1)).onCompleted();
- Mockito.verify(sessionHandler, times(1)).getScheduler();
+ Mockito.verify(sessionHandler, times(2)).getScheduler();
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
.untilAsserted(() -> {
Mockito.verify(sessionHandler, times(1)).isEndpointsDeprecated(eq(endpoints));
@@ -219,16 +228,17 @@
public void testOnCompletedWithSessionHandlerIsNotRunning() throws ClientException {
final Endpoints endpoints = fakeEndpoints();
final ClientSessionHandler sessionHandler = Mockito.mock(ClientSessionHandler.class);
+ Mockito.when(sessionHandler.getScheduler()).thenReturn(new ScheduledThreadPoolExecutor(1));
final StreamObserver<TelemetryCommand> requestObserver = Mockito.mock(StreamObserver.class);
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
any(StreamObserver.class));
Mockito.doNothing().when(requestObserver).onCompleted();
- final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, endpoints);
+ final ClientSessionImpl clientSession = new ClientSessionImpl(sessionHandler, Duration.ofSeconds(3), endpoints);
Mockito.doReturn(FAKE_CLIENT_ID).when(sessionHandler).getClientId();
Mockito.doReturn(false).when(sessionHandler).isRunning();
clientSession.onCompleted();
Mockito.verify(sessionHandler, times(1)).isRunning();
Mockito.verify(requestObserver, times(1)).onCompleted();
- Mockito.verify(sessionHandler, never()).getScheduler();
+ Mockito.verify(sessionHandler, times(1)).getScheduler();
}
}
\ No newline at end of file