blob: fad88701ef3f0628b5789044f9263190c2250208 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.core.net;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.appender.OutputStreamManager;
import org.apache.logging.log4j.core.util.Closer;
import org.apache.logging.log4j.core.util.Log4jThread;
import org.apache.logging.log4j.core.util.NullOutputStream;
import org.apache.logging.log4j.util.Strings;
/**
* Manager of TCP Socket connections.
*/
public class TcpSocketManager extends AbstractSocketManager {
/**
* The default reconnection delay (30000 milliseconds or 30 seconds).
*/
public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
/**
* The default port number of remote logging server (4560).
*/
private static final int DEFAULT_PORT = 4560;
private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
private final int reconnectionDelayMillis;
private Reconnector reconnector;
private Socket socket;
private final SocketOptions socketOptions;
private final boolean retry;
private final boolean immediateFail;
private final int connectTimeoutMillis;
/**
* Constructs.
*
* @param name
* The unique name of this connection.
* @param os
* The OutputStream.
* @param socket
* The Socket.
* @param inetAddress
* The Internet address of the host.
* @param host
* The name of the host.
* @param port
* The port number on the host.
* @param connectTimeoutMillis
* the connect timeout in milliseconds.
* @param reconnectionDelayMillis
* Reconnection interval.
* @param immediateFail
* True if the write should fail if no socket is immediately available.
* @param layout
* The Layout.
* @param bufferSize
* The buffer size.
*/
public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
final int bufferSize, final SocketOptions socketOptions) {
super(name, os, inetAddress, host, port, layout, true, bufferSize);
this.connectTimeoutMillis = connectTimeoutMillis;
this.reconnectionDelayMillis = reconnectionDelayMillis;
this.socket = socket;
this.immediateFail = immediateFail;
this.retry = reconnectionDelayMillis > 0;
if (socket == null) {
this.reconnector = createReconnector();
this.reconnector.start();
}
this.socketOptions = socketOptions;
}
/**
* Obtains a TcpSocketManager.
*
* @param host
* The host to connect to.
* @param port
* The port on the host.
* @param connectTimeoutMillis
* the connect timeout in milliseconds
* @param reconnectDelayMillis
* The interval to pause between retries.
* @param bufferSize
* The buffer size.
* @return A TcpSocketManager.
*/
public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
final int bufferSize, final SocketOptions socketOptions) {
if (Strings.isEmpty(host)) {
throw new IllegalArgumentException("A host name is required");
}
if (port <= 0) {
port = DEFAULT_PORT;
}
if (reconnectDelayMillis == 0) {
reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
}
return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
}
@SuppressWarnings("sync-override") // synchronization on "this" is done within the method
@Override
protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
if (socket == null) {
if (reconnector != null && !immediateFail) {
reconnector.latch();
}
if (socket == null) {
throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
}
}
synchronized (this) {
try {
writeAndFlush(bytes, offset, length, immediateFlush);
} catch (final IOException causeEx) {
if (retry && reconnector == null) {
final String config = inetAddress + ":" + port;
reconnector = createReconnector();
try {
reconnector.reconnect();
} catch (final IOException reconnEx) {
LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
reconnector.start();
throw new AppenderLoggingException(
String.format("Error sending to %s for %s", getName(), config), causeEx);
}
try {
writeAndFlush(bytes, offset, length, immediateFlush);
} catch (final IOException e) {
throw new AppenderLoggingException(
String.format("Error writing to %s after reestablishing connection for %s", getName(),
config),
causeEx);
}
}
}
}
}
private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
throws IOException {
@SuppressWarnings("resource") // outputStream is managed by this class
final OutputStream outputStream = getOutputStream();
outputStream.write(bytes, offset, length);
if (immediateFlush) {
outputStream.flush();
}
}
@Override
protected synchronized boolean closeOutputStream() {
final boolean closed = super.closeOutputStream();
if (reconnector != null) {
reconnector.shutdown();
reconnector.interrupt();
reconnector = null;
}
final Socket oldSocket = socket;
socket = null;
if (oldSocket != null) {
try {
oldSocket.close();
} catch (final IOException e) {
LOGGER.error("Could not close socket {}", socket);
return false;
}
}
return closed;
}
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
/**
* Gets this TcpSocketManager's content format. Specified by:
* <ul>
* <li>Key: "protocol" Value: "tcp"</li>
* <li>Key: "direction" Value: "out"</li>
* </ul>
*
* @return Map of content format keys supporting TcpSocketManager
*/
@Override
public Map<String, String> getContentFormat() {
final Map<String, String> result = new HashMap<>(super.getContentFormat());
result.put("protocol", "tcp");
result.put("direction", "out");
return result;
}
/**
* Handles reconnecting to a Socket on a Thread.
*/
private class Reconnector extends Log4jThread {
private final CountDownLatch latch = new CountDownLatch(1);
private boolean shutdown = false;
private final Object owner;
public Reconnector(final OutputStreamManager owner) {
super("TcpSocketManager-Reconnector");
this.owner = owner;
}
public void latch() {
try {
latch.await();
} catch (final InterruptedException ex) {
// Ignore the exception.
}
}
public void shutdown() {
shutdown = true;
}
@Override
public void run() {
while (!shutdown) {
try {
sleep(reconnectionDelayMillis);
reconnect();
} catch (final InterruptedException ie) {
LOGGER.debug("Reconnection interrupted.");
} catch (final ConnectException ex) {
LOGGER.debug("{}:{} refused connection", host, port);
} catch (final IOException ioe) {
LOGGER.debug("Unable to reconnect to {}:{}", host, port);
} finally {
latch.countDown();
}
}
}
void reconnect() throws IOException {
final List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port);
if (socketAddresses.size() == 1) {
LOGGER.debug("Reconnecting " + socketAddresses.get(0));
connect(socketAddresses.get(0));
} else {
IOException ioe = null;
for (final InetSocketAddress socketAddress : socketAddresses) {
try {
LOGGER.debug("Reconnecting " + socketAddress);
connect(socketAddress);
return;
} catch (final IOException ex) {
ioe = ex;
}
}
throw ioe;
}
}
private void connect(final InetSocketAddress socketAddress) throws IOException {
final Socket sock = createSocket(socketAddress);
@SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
final OutputStream newOS = sock.getOutputStream();
final InetAddress prev = socket != null ? socket.getInetAddress() : null;
synchronized (owner) {
Closer.closeSilently(getOutputStream());
setOutputStream(newOS);
socket = sock;
reconnector = null;
shutdown = true;
}
final String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ?
"reestablished" : "established";
LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket);
}
@Override
public String toString() {
return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]";
}
}
private Reconnector createReconnector() {
final Reconnector recon = new Reconnector(this);
recon.setDaemon(true);
recon.setPriority(Thread.MIN_PRIORITY);
return recon;
}
protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException {
return createSocket(socketAddress, socketOptions, connectTimeoutMillis);
}
protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions,
final int connectTimeoutMillis) throws IOException {
LOGGER.debug("Creating socket {}", socketAddress.toString());
final Socket newSocket = new Socket();
if (socketOptions != null) {
// Not sure which options must be applied before or after the connect() call.
socketOptions.apply(newSocket);
}
newSocket.connect(socketAddress, connectTimeoutMillis);
if (socketOptions != null) {
// Not sure which options must be applied before or after the connect() call.
socketOptions.apply(newSocket);
}
return newSocket;
}
/**
* Data for the factory.
*/
static class FactoryData {
protected final String host;
protected final int port;
protected final int connectTimeoutMillis;
protected final int reconnectDelayMillis;
protected final boolean immediateFail;
protected final Layout<? extends Serializable> layout;
protected final int bufferSize;
protected final SocketOptions socketOptions;
public FactoryData(final String host, final int port, final int connectTimeoutMillis,
final int reconnectDelayMillis, final boolean immediateFail,
final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
this.host = host;
this.port = port;
this.connectTimeoutMillis = connectTimeoutMillis;
this.reconnectDelayMillis = reconnectDelayMillis;
this.immediateFail = immediateFail;
this.layout = layout;
this.bufferSize = bufferSize;
this.socketOptions = socketOptions;
}
@Override
public String toString() {
return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
+ ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
+ ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
}
}
/**
* Factory to create a TcpSocketManager.
*
* @param <M>
* The manager type.
* @param <T>
* The factory data type.
*/
protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
implements ManagerFactory<M, T> {
static HostResolver resolver = new HostResolver();
@SuppressWarnings("resource")
@Override
public M createManager(final String name, final T data) {
final InetAddress inetAddress;
OutputStream os;
try {
inetAddress = InetAddress.getByName(data.host);
} catch (final UnknownHostException ex) {
LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
return null;
}
Socket socket = null;
try {
// LOG4J2-1042
socket = createSocket(data);
os = socket.getOutputStream();
return createManager(name, os, socket, inetAddress, data);
} catch (final IOException ex) {
LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
os = NullOutputStream.getInstance();
}
if (data.reconnectDelayMillis == 0) {
Closer.closeSilently(socket);
return null;
}
return createManager(name, os, null, inetAddress, data);
}
@SuppressWarnings("unchecked")
M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) {
return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
data.bufferSize, data.socketOptions);
}
Socket createSocket(final T data) throws IOException {
final List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port);
IOException ioe = null;
for (final InetSocketAddress socketAddress : socketAddresses) {
try {
return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis);
} catch (final IOException ex) {
ioe = ex;
}
}
throw new IOException(errorMessage(data, socketAddresses) , ioe);
}
protected String errorMessage(final T data, final List<InetSocketAddress> socketAddresses) {
final StringBuilder sb = new StringBuilder("Unable to create socket for ");
sb.append(data.host).append(" at port ").append(data.port);
if (socketAddresses.size() == 1) {
if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) {
sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress());
sb.append(" and port ").append(socketAddresses.get(0).getPort());
}
} else {
sb.append(" using ip addresses and ports ");
for (int i = 0; i < socketAddresses.size(); ++i) {
if (i > 0) {
sb.append(", ");
sb.append(socketAddresses.get(i).getAddress().getHostAddress());
sb.append(":").append(socketAddresses.get(i).getPort());
}
}
}
return sb.toString();
}
}
/**
* This method is only for unit testing. It is not Thread-safe.
* @param resolver the HostResolver.
*/
public static void setHostResolver(final HostResolver resolver) {
TcpSocketManagerFactory.resolver = resolver;
}
public static class HostResolver {
public List<InetSocketAddress> resolveHost(final String host, final int port) throws UnknownHostException {
final InetAddress[] addresses = InetAddress.getAllByName(host);
final List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length);
for (final InetAddress address: addresses) {
socketAddresses.add(new InetSocketAddress(address, port));
}
return socketAddresses;
}
}
/**
* USE AT YOUR OWN RISK, method is public for testing purpose only for now.
*/
public SocketOptions getSocketOptions() {
return socketOptions;
}
/**
* USE AT YOUR OWN RISK, method is public for testing purpose only for now.
*/
public Socket getSocket() {
return socket;
}
public int getReconnectionDelayMillis() {
return reconnectionDelayMillis;
}
@Override
public String toString() {
return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
+ ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
+ immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
+ ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
+ count + "]";
}
}