NIFI-8791 Removed unused ChannelSender and implementations - Removed unused PruneResult inner class from AbstractPutEventProcessor
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5221.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 1b2798c..41afeed 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -266,31 +266,6 @@
}
/**
- * The results from pruning connections.
- */
- protected static class PruneResult {
-
- private final int numClosed;
-
- private final int numConsidered;
-
- public PruneResult(final int numClosed, final int numConsidered) {
- this.numClosed = numClosed;
- this.numConsidered = numConsidered;
- }
-
- public int getNumClosed() {
- return numClosed;
- }
-
- public int getNumConsidered() {
- return numConsidered;
- }
-
- }
-
-
- /**
* Represents a range of messages from a FlowFile.
*/
protected static class Range {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
deleted file mode 100644
index 278a9ab..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.nifi.logging.ComponentLog;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-/**
- * Base class for sending messages over a channel.
- */
-public abstract class ChannelSender {
-
- protected final int port;
- protected final String host;
- protected final int maxSendBufferSize;
- protected final ComponentLog logger;
-
- protected volatile int timeout = 10000;
- protected volatile long lastUsed;
-
- public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
- this.port = port;
- this.host = host;
- this.maxSendBufferSize = maxSendBufferSize;
- this.logger = logger;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- /**
- * @return the last time data was sent over this channel
- */
- public long getLastUsed() {
- return lastUsed;
- }
-
- /**
- * Opens the connection to the destination.
- *
- * @throws IOException if an error occurred opening the connection.
- */
- public abstract void open() throws IOException;
-
- /**
- * Sends the given string over the channel.
- *
- * @param message the message to send over the channel
- * @throws IOException if there was an error communicating over the channel
- */
- public void send(final String message, final Charset charset) throws IOException {
- final byte[] bytes = message.getBytes(charset);
- send(bytes);
- }
-
- /**
- * Sends the given data over the channel.
- *
- * @param data the data to send over the channel
- * @throws IOException if there was an error communicating over the channel
- */
- public void send(final byte[] data) throws IOException {
- try {
- write(data);
- lastUsed = System.currentTimeMillis();
- } catch (IOException e) {
- // failed to send data over the channel, we close it to force
- // the creation of a new one next time
- close();
- throw e;
- }
- }
-
- /**
- * Write the given buffer to the underlying channel.
- */
- protected abstract void write(byte[] data) throws IOException;
-
- /**
- * @return true if the underlying channel is connected
- */
- public abstract boolean isConnected();
-
- /**
- * Close the underlying channel
- */
- public abstract void close();
-
-}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
deleted file mode 100644
index 0b2dfb8..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-
-/**
- * Sends messages over a DatagramChannel.
- */
-public class DatagramChannelSender extends ChannelSender {
-
- private DatagramChannel channel;
-
- public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
- super(host, port, maxSendBufferSize, logger);
- }
-
- @Override
- public void open() throws IOException {
- if (channel == null) {
- channel = DatagramChannel.open();
-
- if (maxSendBufferSize > 0) {
- channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
- final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
- if (actualSendBufSize < maxSendBufferSize) {
- logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
- + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
- + "consider changing the Operating System's maximum receive buffer");
- }
- }
- }
-
- if (!channel.isConnected()) {
- channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
- }
- }
-
- @Override
- protected void write(byte[] data) throws IOException {
- ByteBuffer buffer = ByteBuffer.wrap(data);
- while (buffer.hasRemaining()) {
- channel.write(buffer);
- }
- }
-
- @Override
- public boolean isConnected() {
- return channel != null && channel.isConnected();
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(channel);
- channel = null;
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
deleted file mode 100644
index e2f05cc..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Sends messages over an SSLSocketChannel.
- */
-public class SSLSocketChannelSender extends SocketChannelSender {
-
- private SSLContext sslContext;
- private SSLSocketChannel sslChannel;
- private SSLSocketChannelOutputStream sslOutputStream;
-
- public SSLSocketChannelSender(final String host,
- final int port,
- final int maxSendBufferSize,
- final SSLContext sslContext,
- final ComponentLog logger) {
- super(host, port, maxSendBufferSize, logger);
- this.sslContext = sslContext;
- }
-
- @Override
- public void open() throws IOException {
- if (sslChannel == null) {
- super.open();
- sslChannel = new SSLSocketChannel(sslContext, channel, true);
- }
- sslChannel.setTimeout(timeout);
-
- // SSLSocketChannel will check if already connected so we can safely call this
- sslChannel.connect();
- sslOutputStream = new SSLSocketChannelOutputStream(sslChannel);
- }
-
- @Override
- protected void write(byte[] data) throws IOException {
- sslChannel.write(data);
- }
-
- @Override
- public boolean isConnected() {
- return sslChannel != null && !sslChannel.isClosed();
- }
-
- @Override
- public void close() {
- // Close SSLSocketChannel before closing other resources
- IOUtils.closeQuietly(sslChannel);
- IOUtils.closeQuietly(sslOutputStream);
- super.close();
- sslChannel = null;
- }
-
- @Override
- public OutputStream getOutputStream() {
- return sslOutputStream;
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
deleted file mode 100644
index 9c1aa32..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.put.sender;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.StandardSocketOptions;
-import java.nio.channels.SocketChannel;
-
-/**
- * Sends messages over a SocketChannel.
- */
-public class SocketChannelSender extends ChannelSender {
-
- protected SocketChannel channel;
- protected SocketChannelOutputStream socketChannelOutput;
-
- public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
- super(host, port, maxSendBufferSize, logger);
- }
-
- @Override
- public void open() throws IOException {
- try {
- if (channel == null) {
- channel = SocketChannel.open();
- channel.configureBlocking(false);
-
- if (maxSendBufferSize > 0) {
- channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
- final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
- if (actualSendBufSize < maxSendBufferSize) {
- logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
- + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
- + "consider changing the Operating System's maximum send buffer");
- }
- }
- }
-
- if (!channel.isConnected()) {
- final long startTime = System.currentTimeMillis();
- final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port);
-
- if (!channel.connect(socketAddress)) {
- while (!channel.finishConnect()) {
- if (System.currentTimeMillis() > startTime + timeout) {
- throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port);
- }
-
- try {
- Thread.sleep(50L);
- } catch (final InterruptedException e) {
- }
- }
- }
-
- if (logger.isDebugEnabled()) {
- final SocketAddress localAddress = channel.getLocalAddress();
- if (localAddress != null && localAddress instanceof InetSocketAddress) {
- final InetSocketAddress inetSocketAddress = (InetSocketAddress) localAddress;
- logger.debug("Connected to local port {}", new Object[] {inetSocketAddress.getPort()});
- }
- }
-
- socketChannelOutput = new SocketChannelOutputStream(channel);
- socketChannelOutput.setTimeout(timeout);
- }
- } catch (final IOException e) {
- IOUtils.closeQuietly(channel);
- throw e;
- }
- }
-
- @Override
- protected void write(byte[] data) throws IOException {
- socketChannelOutput.write(data);
- }
-
- @Override
- public boolean isConnected() {
- return channel != null && channel.isConnected();
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(socketChannelOutput);
- IOUtils.closeQuietly(channel);
- socketChannelOutput = null;
- channel = null;
- }
-
- public OutputStream getOutputStream() {
- return new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- socketChannelOutput.write(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- socketChannelOutput.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- socketChannelOutput.write(b, off, len);
- }
-
- @Override
- public void close() throws IOException {
- socketChannelOutput.close();
- }
-
- @Override
- public void flush() throws IOException {
- socketChannelOutput.flush();
- updateLastUsed();
- }
- };
- }
-
- private void updateLastUsed() {
- this.lastUsed = System.currentTimeMillis();
- }
-
-}