clear channel when channelInactive (#3966)

### Motivation

I am stopping a pulsar broker, and I observed a lot of the following logs in the broker log:

```
17:48:35.894 [shutdown-thread-43-1] INFO  org.apache.bookkeeper.proto.PerChannelBookieClient - Closing the per channel bookie client for 10.184.xx.xx:3181
17:48:35.894 [shutdown-thread-43-1] ERROR io.netty.util.concurrent.DefaultPromise.rejectedExecution - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
        at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:842) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]
        at org.apache.bookkeeper.proto.PerChannelBookieClient.closeChannel(PerChannelBookieClient.java:1090) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
        at org.apache.bookkeeper.proto.PerChannelBookieClient.closeInternal(PerChannelBookieClient.java:1079) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
        at org.apache.bookkeeper.proto.PerChannelBookieClient.close(PerChannelBookieClient.java:1063) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
        at org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.close(DefaultPerChannelBookieClientPool.java:157) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
        at org.apache.bookkeeper.proto.BookieClientImpl.close(BookieClientImpl.java:587) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
        at org.apache.bookkeeper.client.BookKeeper.close(BookKeeper.java:1435) ~[org.apache.bookkeeper-bookkeeper-server-4.14.2.jar:4.14.2]
        at org.apache.pulsar.broker.ManagedLedgerClientFactory.close(ManagedLedgerClientFactory.java:142) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
        at org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:417) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
        at org.apache.pulsar.broker.MessagingServiceShutdownHook.lambda$run$1(MessagingServiceShutdownHook.java:62) ~[org.apache.pulsar-pulsar-broker-2.8.1.jar:2.8.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
```

When I stopped the broker, there were more than 30,000 lines in such a log, which seriously polluted my normal log of the broker.
<img width="1180" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/0ec3c423-14f1-41c4-85c3-d978ece15044">

According to my next investigation,
1. When the broker stops, it will close the ManagedLedgerClientFactory. It will close all bookkeeper connections

https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java#L154
```java
    @Override
    public void close(boolean wait) {
        for (int i = 0; i < clients.length; i++) {
            clients[i].close(wait);
            if (clients != clientsV3Enforced) {
                clientsV3Enforced[i].close(wait);
            }
        }
    }

    private void closeInternal(boolean permanent, boolean wait) {
        Channel toClose = null;
        synchronized (this) {
            if (permanent) {
                state = ConnectionState.CLOSED;
            } else if (state != ConnectionState.CLOSED) {
                state = ConnectionState.DISCONNECTED;
            }
            toClose = channel;
            channel = null;
            makeWritable();
        }
        if (toClose != null) {   <== a. toClose not null
            ChannelFuture cf = closeChannel(toClose);
            if (wait) {
                cf.awaitUninterruptibly();
            }
        }
    }

    private ChannelFuture closeChannel(Channel c) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closing channel {}", c);
        }
        return c.close().addListener(x -> makeWritable());  <==  b. here if channel is already inactive, add the listener will throw exception
    }
```


2. When the bookie server first disconnects the client, the client will trigger channelInactive
```java
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOG.info("Disconnected from bookie channel {}", ctx.channel());
        if (ctx.channel() != null) {
            closeChannel(ctx.channel());  <== c. here channel is closed
            if (ctx.channel().pipeline().get(SslHandler.class) != null) {
                activeTlsChannelCounter.dec();
            } else {
                activeNonTlsChannelCounter.dec();
            }
        }

        errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
        errorOutPendingOps(BKException.Code.BookieHandleNotAvailableException);

        synchronized (this) {
            if (this.channel == ctx.channel()
                && state != ConnectionState.CLOSED) {
                state = ConnectionState.DISCONNECTED;
            }
        }

        // we don't want to reconnect right away. If someone sends a request to
        // this address, we will reconnect.
    }
```

If step c happens before b, then b throws the above exception.

step c
<img width="1400" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/4a33855c-5682-4812-ae0e-1a02972ef502">
<img width="1407" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/1c99bccb-b2bd-4cfb-ac4b-ee10b2d236b0">


step a and b
<img width="1403" alt="image" src="https://github.com/apache/bookkeeper/assets/35599757/fec372c5-f97b-47e7-b5d7-ebbaa6edf682">

So when channelInactive, we should reset the channel to null after closed the channel.
Here, when the channel is null, we will reconnect after getting the client, which is consistent with the current behavior.
![image](https://github.com/apache/bookkeeper/assets/35599757/d1178697-27bd-44a5-acd1-33ccec99e7a4)


1 file changed
tree: 37fdec55c0f4c60d674cdf46043561daf8dc9e32
  1. .github/
  2. .test-infra/
  3. bin/
  4. bookkeeper-benchmark/
  5. bookkeeper-common/
  6. bookkeeper-common-allocator/
  7. bookkeeper-dist/
  8. bookkeeper-http/
  9. bookkeeper-proto/
  10. bookkeeper-server/
  11. bookkeeper-slogger/
  12. buildtools/
  13. circe-checksum/
  14. conf/
  15. cpu-affinity/
  16. deploy/
  17. dev/
  18. docker/
  19. metadata-drivers/
  20. microbenchmarks/
  21. native-io/
  22. shaded/
  23. site3/
  24. src/
  25. stats/
  26. stream/
  27. tests/
  28. testtools/
  29. tools/
  30. .asf.yaml
  31. .dlc.json
  32. .gitignore
  33. CONTRIBUTING.md
  34. LICENSE
  35. NOTICE
  36. pom.xml
  37. README.md
README.md

Maven Central

Apache BookKeeper

Apache BookKeeper is a scalable, fault tolerant and low latency storage service optimized for append-only workloads.

It is suitable for being used in following scenarios:

  • WAL (Write-Ahead-Logging), e.g. HDFS NameNode, Pravega.
  • Message Store, e.g. Apache Pulsar.
  • Offset/Cursor Store, e.g. Apache Pulsar.
  • Object/Blob Store, e.g. storing state machine snapshots.

Get Started

  • Checkout the project website.
  • Concepts: Start with the basic concepts of Apache BookKeeper. This will help you to fully understand the other parts of the documentation.
  • Follow the Install guide to setup BookKeeper.

Documentation

Please visit the Documentation from the project website for more information.

Get In Touch

Report a Bug

For filing bugs, suggesting improvements, or requesting new features, help us out by opening a Github issue.

Need Help?

Subscribe or mail the user@bookkeeper.apache.org list - Ask questions, find answers, and also help other users.

Subscribe or mail the dev@bookkeeper.apache.org list - Join development discussions, propose new ideas and connect with contributors.

Join us on Slack - This is the most immediate way to connect with Apache BookKeeper committers and contributors.

Contributing

We feel that a welcoming open community is important and welcome contributions.

Contributing Code

  1. See our installation guide to get your local environment setup.

  2. Take a look at our open issues: Github Issues.

  3. Review our coding style and follow our pull requests to learn more about our conventions.

  4. Make your changes according to our contributing guide