blob: a64cc5f049b8d0fac850f6a876390fe689c74a55 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.runtime;
import java.io.Serializable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
/**
* An abstract class for general connector connection management.
* <p>
* The basic (default) model is to maintain a connection across
* inadvertent disconnects.
* <p>
* A connected connector is returned
* whenever {@link #client()} is called. If the connection is lost,
* actions are taken to reestablish it. Overall tracking and
* control of connection state is performed and
* general logging is performed.
* <p>
* Sub-classes are able to further control when initial connection
* occurs and generally expose ways to explicitly disconnect or connect.
* <p>
* Optionally, clients can request automatic disconnection after
* a period of inactivity - see {@link #setIdleTimeout(long, TimeUnit)}.
* Support for this feature requires connector implementations
* to call {@link #notIdle()}.
* <p>
* TODO - dynamic control:
* <ul>
* <li>[abstract] ConnectorControl { getState(), connect(), disconnect(), ...}.</li>
* <li>MqttConnectorControl extends ConnectorControl. Adds subscribe(), unsubscribe().</li>
* <li>Add Consumer&lt;MqttConnectorControl&gt; to MqttConfig. Runtime calls it supplying a connector control object.
* By doing a disconnect()/connect(), the Supplier&lt;MqttConfig&gt; will be called to as part of reconnect -- hence using updated values if any.</li>
* </ul>
* <p>
* Sub-classes are responsible for implementing a small number
* of operations: {@link #doConnect(Object)}, {@link #doDisconnect(Object)},
* {@link #doClose(Object)}, and {@link #id()}. Sub-classes are also responsible for
* calling {@link #connectionLost(Throwable)} and {@link #notIdle()}.
*
* @param <T> type of the underlying managed connector (e.g., MqttClient)
*/
public abstract class Connector<T> implements AutoCloseable, Serializable {
private static final long serialVersionUID = 1L;
private static final long BASE_RETRY_WAIT_MSEC = 2*1000;
private static final long MAX_RETRY_WAIT_MSEC = 60*1000;
private final IdleManager idleManager;
private State state = State.DISCONNECTED;
private T client; // must be non-null when state==CONNECTED
private Future<?> connectFuture;
// TODO proper "operator-acquired" thread/scheduler w/thread factory, etc
// hmm... today a single [Mqtt]Connector supports multiple publish operators
// and a subscribe operator, hence the threads should come from... ?
private final ExecutorService connectExecutor = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(0);
public enum State {
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING,
CLOSING,
CLOSED
}
protected Connector() {
idleManager = new IdleManager(this, schedExecutor);
}
public abstract Logger getLogger();
/**
* Must be called by the connector when an action is performed that
* qualifies as a "not idle" condition.
*/
public void notIdle() {
idleManager.notIdle();
}
/**
* Disconnect the connector after the specified period of inactivity.
* @param idleTimeout idleTimeout(long)
* @param unit TimeUnit
*/
public void setIdleTimeout(long idleTimeout, TimeUnit unit) {
idleManager.setIdleTimeoutMsec(unit.toMillis(idleTimeout));
}
public void setIdleReconnectInterval(int intervalSec) {
idleManager.setIdleReconnectInterval(intervalSec);
}
/**
* Get a connected client connector.
* @return the client connector
* @throws IllegalStateException if state is inappropriate to return connected client connector
* @throws Exception if unable to connect
*/
public T client() throws Exception {
return connectInternal();
}
// Expose with client driven dynamic disconnect/reconnect
// /**
// * Connect the client connector if necessary.
// * Blocks until connected or it's no longer valid to request a connect().
// * @throws IllegalStateException if a connect() is (no longer) appropriate.
// */
// public void connect() throws Exception {
// connectInternal();
// }
/**
* Connect the client connector if necessary.
* Blocks until connected or it's no longer valid to request a connect().
* @return connected client connector
* @throws IllegalStateException if a connect() is (no longer) appropriate.
*/
private T connectInternal() throws Exception {
// N.B. have to deal with multiple concurrent connect requests
// e.g., multiple publishers and a single subscriber for a connector.
Future<?> f = null;
synchronized(this) {
if (state == State.CONNECTED)
return client;
else if (state == State.CONNECTING)
f = connectFuture;
else if (state == State.DISCONNECTED) {
startAsyncConnect();
f = connectFuture;
}
else
throw wrongState(state, "connectInternal()");
}
awaitDone(f); // throws if task not successful
return connectedClient();
}
private IllegalStateException wrongState(State s, String op) {
String msg = String.format("%s %s wrong state %s", id(), op, s);
getLogger().error(msg);
return new IllegalStateException(msg);
}
/**
* Get the connected client; throw if not connected.
* @return connected client
* @throws IllegalStateException if not connected
*/
private T connectedClient() {
synchronized(this) {
if (state != State.CONNECTED)
throw wrongState(state, "connectedClient()");
return client;
}
}
/**
* Schedule an async connect task.
* <p>
* Updates connectFuture if successfully initiated.
* Updates state and client once successfully completed.
* @throws IllegalStateException if inappropriate state for async connect request
*/
private void startAsyncConnect() {
synchronized(this) {
if (state != State.DISCONNECTED)
throw wrongState(state, "startAsyncConnect()");
getLogger().trace("{} submitting async connect task", id());
setStateUnsafe(State.CONNECTING);
connectFuture = connectExecutor.submit(
() -> { connectTask(); return null; });
}
}
/**
* Wait till task is done.
* @throws Exception if task wasn't successful
*/
private void awaitDone(Future<?> f) throws Exception {
try {
getLogger().trace("{} awaiting done", id());
f.get();
}
catch (InterruptedException e) {
// assume that if we're cancelled we want to cancel the future task
getLogger().trace("{} awaitDone() interrupted, cancelling task", id());
f.cancel(true);
throw e;
}
catch (CancellationException e) {
String msg = String.format("%s awaitDone() task was cancelled", id());
getLogger().trace(msg);
throw new IllegalStateException(msg);
}
catch (ExecutionException e) {
String msg = String.format("%s awaitDone() task failed", id());
getLogger().error(msg);
throw new IllegalStateException(msg, e.getCause());
}
}
/**
* Do a blocking connect with retries.
* <p>
* Updates state and client when successful.
* @throws IllegalStateException if state no longer appropriate for connect.
* @throws Exception some other problem.
*/
private void connectTask() throws Exception {
int retryCnt = 0;
while(true) {
try {
oneConnect(retryCnt++);
return;
}
catch (IllegalStateException e) {
throw e; // already logged
}
catch (Exception e) {
// already logged
long msec = getConnectRetryDelayMsec(retryCnt);
getLogger().info("{} connectTask() waiting {}msec to retry", id(), msec);
Thread.sleep(msec);
}
}
}
/**
* Do a blocking single connect attempt.
* <p>
* Updates state and client and idle task when successful.
* @throws IllegalStateException if state no longer appropriate for connect.
* @throws Exception if connect failed.
*/
private void oneConnect(int tryCnt) throws Exception {
synchronized(this) {
if (state != State.CONNECTING)
throw wrongState(state, "oneConnect()");
}
getLogger().trace("{} doing one connect", id());
T result;
try {
result = doConnect(client);
}
catch (Exception e) {
getLogger().error("{} doConnect() failed", id(), e);
throw e;
}
getLogger().trace("{} connected", id());
synchronized(this) {
if (state != State.CONNECTING) {
// need to disconnect/close result?
throw wrongState(state, "oneConnect()");
}
setStateUnsafe(State.CONNECTED);
client = result;
idleManager.connected();
}
}
/**
* Get the next connect retry delay.
* @param retryCnt the retry attempt number
* @return the delay interval in msec
*/
private long getConnectRetryDelayMsec(int retryCnt) {
int factor = retryCnt <= 1 ? 1 : 2 << Math.min(retryCnt - 2, 8);
return Math.min(BASE_RETRY_WAIT_MSEC * factor, MAX_RETRY_WAIT_MSEC);
}
// Expose with client driven dynamic disconnect/reconnect
// /**
// * Disconnect the client connector.
// * <p>
// * Subsequently, a connect() may performed to reconnect.
// * @throws IllegalStateException inappropriate state to request disconnect
// */
// public void disconnect() {
// disconnect(false);
// }
/**
* Disconnect the client connector.
* <p>
* Subsequently, a connect() may performed to reconnect.
* @param wasIdle true if being disconnected due to an idle connection
* @throws IllegalStateException inappropriate state to request disconnect
*/
void disconnect(boolean wasIdle) {
synchronized(this) {
if (!(state == State.CONNECTED || state == State.CONNECTING))
throw wrongState(state, "disconnect("+wasIdle+")");
try {
getLogger().trace("Connection {} disconnecting wasIdle:{}", id(), wasIdle);
setStateUnsafe(State.DISCONNECTING);
cancelConnectTaskUnsafe();
doDisconnect(client);
}
catch (Exception e) {
getLogger().error("{} disconnnect() failed", id(), e);
}
finally {
setStateUnsafe(State.DISCONNECTED);
idleManager.disconnected(wasIdle);
}
}
}
/**
* Permanently close the client connector.
* <p>
* Expect any subsequent operations to fail.
* @throws Exception shouldn't happen
*/
@Override
public void close() throws Exception {
synchronized(this) {
if (state == State.CLOSED) {
getLogger().trace("{} close() state already {}", id(), state);
return;
}
try {
getLogger().info("Connection {} closing", id());
setStateUnsafe(State.CLOSING);
idleManager.close();
cancelConnectTaskUnsafe();
if (client != null)
doClose(client);
}
catch (Exception e) {
getLogger().error("{} close() failed", id(), e);
}
finally {
setStateUnsafe(State.CLOSED);
connectFuture = null;
client = null;
connectExecutor.shutdownNow();
schedExecutor.shutdownNow();
}
}
}
/**
* Cancel a pending connect task if any.
*/
private void cancelConnectTaskUnsafe() {
if (connectFuture != null && !connectFuture.isDone()) {
getLogger().trace("{} cancelConnect()", id());
connectFuture.cancel(true);
}
}
/**
* To be called by the connector when it detects a connection lost condition.
* <p>
* An asynchronous reconnect is initiated if appropriate.
* @param t the cause. may be null.
*/
protected void connectionLost(Throwable t) {
synchronized(this) {
getLogger().info("Connection {} connectionLost()", id(), t);
if (state == State.CONNECTED) {
setStateUnsafe(State.DISCONNECTED);
// don't allow unwind
try {
startAsyncConnect();
}
catch (Exception e) {
getLogger().error("{} startAsyncConnect() failed", id(), e);
}
}
else {
getLogger().trace("{} connectionLost() state already {}", id(), state);
}
}
}
private void setStateUnsafe(State state) {
State prev = this.state;
this.state = state;
getLogger().info("{} state {} (was {})", id(), state, prev);
}
State getState() {
synchronized(this) {
return state;
}
}
/**
* A one-shot request to connect the client to its server.
* @param client the connector's client object. may be null.
* @return a connected client. not necessarily the same as {@code client}.
* @throws Exception if unable to connect
*/
protected abstract T doConnect(T client) throws Exception;
/**
* A one-shot request to disconnect the client.
* @param client the connector's client object.
* @throws Exception if unable to disconnect
*/
protected abstract void doDisconnect(T client) throws Exception;
/**
* A one-shot request to permanently close the client.
* @param client the connector's client object.
* @throws Exception if unable to close
*/
protected abstract void doClose(T client) throws Exception;
/**
* Get a connector id to use in log and exception msgs
* @return the id
*/
protected abstract String id();
}