blob: cb8853eed4b6c35513631c0ad25e0738b7a06c8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.aries.rsa.provider.fastbin.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import org.apache.aries.rsa.provider.fastbin.io.TransportAcceptListener;
import org.apache.aries.rsa.provider.fastbin.io.TransportServer;
import org.apache.aries.rsa.provider.fastbin.util.IntrospectionSupport;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.DispatchSource;
/**
* A TCP based transport server
*
*/
public class TcpTransportServer implements TransportServer {
private final String bindScheme;
private final InetSocketAddress bindAddress;
private int backlog = 100;
private Map<String, Object> transportOptions;
private ServerSocketChannel channel;
private TransportAcceptListener listener;
private DispatchQueue dispatchQueue;
private DispatchSource acceptSource;
public TcpTransportServer(URI location) throws UnknownHostException {
bindScheme = location.getScheme();
String host = location.getHost();
host = (host == null || host.length() == 0) ? "::" : host;
bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort());
}
public void setAcceptListener(TransportAcceptListener listener) {
this.listener = listener;
}
public InetSocketAddress getSocketAddress() {
return (InetSocketAddress) channel.socket().getLocalSocketAddress();
}
public DispatchQueue getDispatchQueue() {
return dispatchQueue;
}
public void setDispatchQueue(DispatchQueue dispatchQueue) {
this.dispatchQueue = dispatchQueue;
}
public void suspend() {
acceptSource.suspend();
}
public void resume() {
acceptSource.resume();
}
public void start() throws Exception {
start(null);
}
public void start(Runnable onCompleted) throws Exception {
try {
channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(bindAddress, backlog);
} catch (IOException e) {
throw new IOException("Failed to bind to server socket: " + bindAddress + " due to: " + e, e);
}
acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
acceptSource.setEventHandler(new Runnable() {
public void run() {
try {
SocketChannel client = channel.accept();
while( client!=null ) {
handleSocket(client);
client = channel.accept();
}
} catch (Exception e) {
listener.onAcceptError(TcpTransportServer.this, e);
}
}
});
acceptSource.setCancelHandler(new Runnable() {
public void run() {
try {
channel.close();
} catch (IOException e) {
}
}
});
acceptSource.resume();
if( onCompleted!=null ) {
dispatchQueue.execute(onCompleted);
}
}
public String getBoundAddress() {
try {
return new URI(bindScheme, null, bindAddress.getAddress().getHostAddress(), channel.socket().getLocalPort(), null, null, null).toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
public String getConnectAddress() {
try {
return new URI(bindScheme, null, resolveHostName(), channel.socket().getLocalPort(), null, null, null).toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
protected String resolveHostName() {
String result;
if (bindAddress.getAddress().isAnyLocalAddress()) {
// make it more human readable and useful, an alternative to 0.0.0.0
try {
result = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
result = "localhost";
}
} else {
result = bindAddress.getAddress().getCanonicalHostName();
}
return result;
}
public void stop() {
stop(null);
}
public void stop(final Runnable onCompleted) {
if( acceptSource.isCanceled() ) {
onCompleted.run();
} else {
acceptSource.setCancelHandler(new Runnable() {
public void run() {
try {
channel.close();
} catch (IOException e) {
}
if( onCompleted!=null ) {
onCompleted.run();
}
}
});
acceptSource.cancel();
}
}
public int getBacklog() {
return backlog;
}
public void setBacklog(int backlog) {
this.backlog = backlog;
}
protected final void handleSocket(SocketChannel socket) throws Exception {
HashMap<String, Object> options = new HashMap<>();
// options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
// options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
// options.put("trace", Boolean.valueOf(trace));
// options.put("soTimeout", Integer.valueOf(soTimeout));
// options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
// options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
// options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
// options.put("startLogging", Boolean.valueOf(startLogging));
TcpTransport transport = createTransport(socket, options);
listener.onAccept(this, transport);
}
protected TcpTransport createTransport(SocketChannel socketChannel, HashMap<String, Object> options) throws Exception {
TcpTransport transport = createTransport();
transport.connected(socketChannel);
if( options!=null ) {
IntrospectionSupport.setProperties(transport, options);
}
if (transportOptions != null) {
IntrospectionSupport.setProperties(transport, transportOptions);
}
return transport;
}
protected TcpTransport createTransport() {
return new TcpTransport();
}
public void setTransportOption(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
/**
* @return pretty print of this
*/
public String toString() {
return getBoundAddress();
}
}