NIFI-8791 Removed unused ChannelSender and implementations - Removed unused PruneResult inner class from AbstractPutEventProcessor

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5221.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 1b2798c..41afeed 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -266,31 +266,6 @@
     }
 
     /**
-     * The results from pruning connections.
-     */
-    protected static class PruneResult {
-
-        private final int numClosed;
-
-        private final int numConsidered;
-
-        public PruneResult(final int numClosed, final int numConsidered) {
-            this.numClosed = numClosed;
-            this.numConsidered = numConsidered;
-        }
-
-        public int getNumClosed() {
-            return numClosed;
-        }
-
-        public int getNumConsidered() {
-            return numConsidered;
-        }
-
-    }
-
-
-    /**
      * Represents a range of messages from a FlowFile.
      */
     protected static class Range {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
deleted file mode 100644
index 278a9ab..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.nifi.logging.ComponentLog;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-/**
- * Base class for sending messages over a channel.
- */
-public abstract class ChannelSender {
-
-    protected final int port;
-    protected final String host;
-    protected final int maxSendBufferSize;
-    protected final ComponentLog logger;
-
-    protected volatile int timeout = 10000;
-    protected volatile long lastUsed;
-
-    public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
-        this.port = port;
-        this.host = host;
-        this.maxSendBufferSize = maxSendBufferSize;
-        this.logger = logger;
-    }
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    /**
-     * @return the last time data was sent over this channel
-     */
-    public long getLastUsed() {
-        return lastUsed;
-    }
-
-    /**
-     * Opens the connection to the destination.
-     *
-     * @throws IOException if an error occurred opening the connection.
-     */
-    public abstract void open() throws IOException;
-
-    /**
-     * Sends the given string over the channel.
-     *
-     * @param message the message to send over the channel
-     * @throws IOException if there was an error communicating over the channel
-     */
-    public void send(final String message, final Charset charset) throws IOException {
-        final byte[] bytes = message.getBytes(charset);
-        send(bytes);
-    }
-
-    /**
-     * Sends the given data over the channel.
-     *
-     * @param data the data to send over the channel
-     * @throws IOException if there was an error communicating over the channel
-     */
-    public void send(final byte[] data) throws IOException {
-        try {
-            write(data);
-            lastUsed = System.currentTimeMillis();
-        } catch (IOException e) {
-            // failed to send data over the channel, we close it to force
-            // the creation of a new one next time
-            close();
-            throw e;
-        }
-    }
-
-    /**
-     * Write the given buffer to the underlying channel.
-     */
-    protected abstract void write(byte[] data) throws IOException;
-
-    /**
-     * @return true if the underlying channel is connected
-     */
-    public abstract boolean isConnected();
-
-    /**
-     * Close the underlying channel
-     */
-    public abstract void close();
-
-}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
deleted file mode 100644
index 0b2dfb8..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-
-/**
- * Sends messages over a DatagramChannel.
- */
-public class DatagramChannelSender extends ChannelSender {
-
-    private DatagramChannel channel;
-
-    public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
-        super(host, port, maxSendBufferSize, logger);
-    }
-
-    @Override
-    public void open() throws IOException {
-        if (channel == null) {
-            channel = DatagramChannel.open();
-
-            if (maxSendBufferSize > 0) {
-                channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
-                final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
-                if (actualSendBufSize < maxSendBufferSize) {
-                    logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
-                            + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
-                            + "consider changing the Operating System's maximum receive buffer");
-                }
-            }
-        }
-
-        if (!channel.isConnected()) {
-            channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
-        }
-    }
-
-    @Override
-    protected void write(byte[] data) throws IOException {
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        while (buffer.hasRemaining()) {
-            channel.write(buffer);
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return channel != null && channel.isConnected();
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(channel);
-        channel = null;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
deleted file mode 100644
index e2f05cc..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Sends messages over an SSLSocketChannel.
- */
-public class SSLSocketChannelSender extends SocketChannelSender {
-
-    private SSLContext sslContext;
-    private SSLSocketChannel sslChannel;
-    private SSLSocketChannelOutputStream sslOutputStream;
-
-    public SSLSocketChannelSender(final String host,
-                                  final int port,
-                                  final int maxSendBufferSize,
-                                  final SSLContext sslContext,
-                                  final ComponentLog logger) {
-        super(host, port, maxSendBufferSize, logger);
-        this.sslContext = sslContext;
-    }
-
-    @Override
-    public void open() throws IOException {
-        if (sslChannel == null) {
-            super.open();
-            sslChannel = new SSLSocketChannel(sslContext, channel, true);
-        }
-        sslChannel.setTimeout(timeout);
-
-        // SSLSocketChannel will check if already connected so we can safely call this
-        sslChannel.connect();
-        sslOutputStream = new SSLSocketChannelOutputStream(sslChannel);
-    }
-
-    @Override
-    protected void write(byte[] data) throws IOException {
-        sslChannel.write(data);
-    }
-
-    @Override
-    public boolean isConnected() {
-        return sslChannel != null && !sslChannel.isClosed();
-    }
-
-    @Override
-    public void close() {
-        // Close SSLSocketChannel before closing other resources
-        IOUtils.closeQuietly(sslChannel);
-        IOUtils.closeQuietly(sslOutputStream);
-        super.close();
-        sslChannel = null;
-    }
-
-    @Override
-    public OutputStream getOutputStream() {
-        return sslOutputStream;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
deleted file mode 100644
index 9c1aa32..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.StandardSocketOptions;
-import java.nio.channels.SocketChannel;
-
-/**
- * Sends messages over a SocketChannel.
- */
-public class SocketChannelSender extends ChannelSender {
-
-    protected SocketChannel channel;
-    protected SocketChannelOutputStream socketChannelOutput;
-
-    public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
-        super(host, port, maxSendBufferSize, logger);
-    }
-
-    @Override
-    public void open() throws IOException {
-        try {
-            if (channel == null) {
-                channel = SocketChannel.open();
-                channel.configureBlocking(false);
-
-                if (maxSendBufferSize > 0) {
-                    channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
-                    final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
-                    if (actualSendBufSize < maxSendBufferSize) {
-                        logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
-                                + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
-                                + "consider changing the Operating System's maximum send buffer");
-                    }
-                }
-            }
-
-            if (!channel.isConnected()) {
-                final long startTime = System.currentTimeMillis();
-                final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
-
-                if (!channel.connect(socketAddress)) {
-                    while (!channel.finishConnect()) {
-                        if (System.currentTimeMillis() > startTime + timeout) {
-                            throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
-                        }
-
-                        try {
-                            Thread.sleep(50L);
-                        } catch (final InterruptedException e) {
-                        }
-                    }
-                }
-
-                if (logger.isDebugEnabled()) {
-                    final SocketAddress localAddress = channel.getLocalAddress();
-                    if (localAddress != null && localAddress instanceof InetSocketAddress) {
-                        final InetSocketAddress inetSocketAddress = (InetSocketAddress) localAddress;
-                        logger.debug("Connected to local port {}", new Object[] {inetSocketAddress.getPort()});
-                    }
-                }
-
-                socketChannelOutput = new SocketChannelOutputStream(channel);
-                socketChannelOutput.setTimeout(timeout);
-            }
-        } catch (final IOException e) {
-            IOUtils.closeQuietly(channel);
-            throw e;
-        }
-    }
-
-    @Override
-    protected void write(byte[] data) throws IOException {
-        socketChannelOutput.write(data);
-    }
-
-    @Override
-    public boolean isConnected() {
-        return channel != null && channel.isConnected();
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(socketChannelOutput);
-        IOUtils.closeQuietly(channel);
-        socketChannelOutput = null;
-        channel = null;
-    }
-
-    public OutputStream getOutputStream() {
-        return new OutputStream() {
-            @Override
-            public void write(int b) throws IOException {
-               socketChannelOutput.write(b);
-            }
-
-            @Override
-            public void write(byte[] b) throws IOException {
-                socketChannelOutput.write(b);
-            }
-
-            @Override
-            public void write(byte[] b, int off, int len) throws IOException {
-                socketChannelOutput.write(b, off, len);
-            }
-
-            @Override
-            public void close() throws IOException {
-                socketChannelOutput.close();
-            }
-
-            @Override
-            public void flush() throws IOException {
-                socketChannelOutput.flush();
-                updateLastUsed();
-            }
-        };
-    }
-
-    private void updateLastUsed() {
-        this.lastUsed = System.currentTimeMillis();
-    }
-
-}