blob: d3f37f46c5611538cf6790e29497e7bcba3bb080 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketProxy {
private static final transient Logger LOG = LoggerFactory.getLogger(SocketProxy.class);
public static final int ACCEPT_TIMEOUT_MILLIS = 100;
private URI proxyUrl;
private URI target;
private Acceptor acceptor;
private ServerSocket serverSocket;
private CountDownLatch closed = new CountDownLatch(1);
public List<Bridge> connections = new LinkedList<Bridge>();
private int listenPort = 0;
private int receiveBufferSize = -1;
private boolean pauseAtStart = false;
private int acceptBacklog = 50;
public SocketProxy() throws Exception {
}
public SocketProxy(URI uri) throws Exception {
this(0, uri);
}
public SocketProxy(int port, URI uri) throws Exception {
listenPort = port;
target = uri;
open();
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
public void setTarget(URI tcpBrokerUri) {
target = tcpBrokerUri;
}
public void open() throws Exception {
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
}
if (proxyUrl == null) {
serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
proxyUrl = urlFromSocket(target, serverSocket);
} else {
serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
}
acceptor = new Acceptor(serverSocket, target);
if (pauseAtStart) {
acceptor.pause();
}
new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
closed = new CountDownLatch(1);
}
public URI getUrl() {
return proxyUrl;
}
/*
* close all proxy connections and acceptor
*/
public void close() {
List<Bridge> connections;
synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
LOG.info("close, numConnectons=" + connections.size());
for (Bridge con : connections) {
closeConnection(con);
}
acceptor.close();
closed.countDown();
}
/*
* close all proxy receive connections, leaving acceptor
* open
*/
public void halfClose() {
List<Bridge> connections;
synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
LOG.info("halfClose, numConnectons=" + connections.size());
for (Bridge con : connections) {
halfCloseConnection(con);
}
}
public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
return closed.await(timeoutSeconds, TimeUnit.SECONDS);
}
/*
* called after a close to restart the acceptor on the same port
*/
public void reopen() {
LOG.info("reopen");
try {
open();
} catch (Exception e) {
LOG.debug("exception on reopen url:" + getUrl(), e);
}
}
/*
* pause accepting new connecitons and data transfer through existing proxy
* connections. All sockets remain open
*/
public void pause() {
synchronized(connections) {
LOG.info("pause, numConnectons=" + connections.size());
acceptor.pause();
for (Bridge con : connections) {
con.pause();
}
}
}
/*
* continue after pause
*/
public void goOn() {
synchronized(connections) {
LOG.info("goOn, numConnectons=" + connections.size());
for (Bridge con : connections) {
con.goOn();
}
}
acceptor.goOn();
}
private void closeConnection(Bridge c) {
try {
c.close();
} catch (Exception e) {
LOG.debug("exception on close of: " + c, e);
}
}
private void halfCloseConnection(Bridge c) {
try {
c.halfClose();
} catch (Exception e) {
LOG.debug("exception on half close of: " + c, e);
}
}
public boolean isPauseAtStart() {
return pauseAtStart;
}
public void setPauseAtStart(boolean pauseAtStart) {
this.pauseAtStart = pauseAtStart;
}
public int getAcceptBacklog() {
return acceptBacklog;
}
public void setAcceptBacklog(int acceptBacklog) {
this.acceptBacklog = acceptBacklog;
}
private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
int listenPort = serverSocket.getLocalPort();
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
}
public class Bridge {
private Socket receiveSocket;
private Socket sendSocket;
private Pump requestThread;
private Pump responseThread;
public Bridge(Socket socket, URI target) throws Exception {
receiveSocket = socket;
sendSocket = new Socket();
if (receiveBufferSize > 0) {
sendSocket.setReceiveBufferSize(receiveBufferSize);
}
sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort()));
linkWithThreads(receiveSocket, sendSocket);
LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize());
}
public void goOn() {
responseThread.goOn();
requestThread.goOn();
}
public void pause() {
requestThread.pause();
responseThread.pause();
}
public void close() throws Exception {
synchronized(connections) {
connections.remove(this);
}
receiveSocket.close();
sendSocket.close();
}
public void halfClose() throws Exception {
receiveSocket.close();
}
private void linkWithThreads(Socket source, Socket dest) {
requestThread = new Pump(source, dest);
requestThread.start();
responseThread = new Pump(dest, source);
responseThread.start();
}
public class Pump extends Thread {
protected Socket src;
private Socket destination;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Pump(Socket source, Socket dest) {
super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
src = source;
destination = dest;
pause.set(new CountDownLatch(0));
}
public void pause() {
pause.set(new CountDownLatch(1));
}
public void goOn() {
pause.get().countDown();
}
public void run() {
byte[] buf = new byte[1024];
try {
InputStream in = src.getInputStream();
OutputStream out = destination.getOutputStream();
while (true) {
int len = in.read(buf);
if (len == -1) {
LOG.debug("read eof from:" + src);
break;
}
pause.get().await();
out.write(buf, 0, len);
}
} catch (Exception e) {
LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
try {
if (!receiveSocket.isClosed()) {
// for halfClose, on read/write failure if we close the
// remote end will see a close at the same time.
close();
}
} catch (Exception ignore) {
}
}
}
}
}
public class Acceptor implements Runnable {
private ServerSocket socket;
private URI target;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Acceptor(ServerSocket serverSocket, URI uri) {
socket = serverSocket;
target = uri;
pause.set(new CountDownLatch(0));
try {
socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
} catch (SocketException e) {
e.printStackTrace();
}
}
public void pause() {
pause.set(new CountDownLatch(1));
}
public void goOn() {
pause.get().countDown();
}
public void run() {
try {
while(!socket.isClosed()) {
pause.get().await();
try {
Socket source = socket.accept();
pause.get().await();
if (receiveBufferSize > 0) {
source.setReceiveBufferSize(receiveBufferSize);
}
LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
synchronized(connections) {
connections.add(new Bridge(source, target));
}
} catch (SocketTimeoutException expected) {
}
}
} catch (Exception e) {
LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
}
}
public void close() {
try {
socket.close();
closed.countDown();
goOn();
} catch (IOException ignored) {
}
}
}
}