blob: dc8a1ca2d0113e2f090930efa9caabf08cd19e22 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.as2.api;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import org.apache.camel.component.as2.api.entity.DispositionNotificationMultipartReportEntity;
import org.apache.camel.component.as2.api.io.AS2BHttpServerConnection;
import org.apache.camel.component.as2.api.protocol.ResponseMDN;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpException;
import org.apache.http.HttpInetConnection;
import org.apache.http.HttpServerConnection;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.protocol.UriHttpRequestHandlerMapper;
import org.apache.http.util.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AS2ServerConnection {
private static final Logger LOG = LoggerFactory.getLogger(AS2ServerConnection.class);
private static final String REQUEST_LISTENER_THREAD_NAME_PREFIX = "AS2Svr-";
private static final String REQUEST_HANDLER_THREAD_NAME_PREFIX = "AS2Hdlr-";
class RequestListenerThread extends Thread {
private final ServerSocket serversocket;
private final HttpService httpService;
private UriHttpRequestHandlerMapper reqistry;
public RequestListenerThread(String as2Version,
String originServer,
String serverFqdn,
int port,
AS2SignatureAlgorithm signatureAlgorithm,
Certificate[] signingCertificateChain,
PrivateKey signingPrivateKey,
PrivateKey decryptingPrivateKey)
throws IOException {
setName(REQUEST_LISTENER_THREAD_NAME_PREFIX + port);
serversocket = new ServerSocket(port);
// Set up HTTP protocol processor for incoming connections
final HttpProcessor inhttpproc = initProtocolProcessor(as2Version, originServer, serverFqdn, port,
signatureAlgorithm, signingCertificateChain, signingPrivateKey, decryptingPrivateKey);
reqistry = new UriHttpRequestHandlerMapper();
// Set up the HTTP service
httpService = new HttpService(inhttpproc, reqistry);
}
@Override
public void run() {
LOG.info("Listening on port {}", this.serversocket.getLocalPort());
while (!Thread.interrupted()) {
try {
final int bufsize = 8 * 1024;
// Set up incoming HTTP connection
final Socket insocket = this.serversocket.accept();
final AS2BHttpServerConnection inconn = new AS2BHttpServerConnection(bufsize);
LOG.info("Incoming connection from {}", insocket.getInetAddress());
inconn.bind(insocket);
// Start worker thread
final Thread t = new RequestHandlerThread(this.httpService, inconn);
t.setDaemon(true);
t.start();
} catch (final InterruptedIOException ex) {
break;
} catch (final SocketException e) {
// Server socket closed
break;
} catch (final IOException e) {
LOG.error("I/O error initialising connection thread: {}", e.getMessage());
break;
}
}
}
void registerHandler(String requestUriPattern, HttpRequestHandler httpRequestHandler) {
reqistry.register(requestUriPattern, httpRequestHandler);
}
void unregisterHandler(String requestUri) {
reqistry.unregister(requestUri);
}
}
class RequestHandlerThread extends Thread {
private HttpService httpService;
private HttpServerConnection serverConnection;
public RequestHandlerThread(HttpService httpService, HttpServerConnection serverConnection) {
if (serverConnection instanceof HttpInetConnection) {
HttpInetConnection inetConnection = (HttpInetConnection) serverConnection;
setName(REQUEST_HANDLER_THREAD_NAME_PREFIX + inetConnection.getLocalPort());
} else {
setName(REQUEST_HANDLER_THREAD_NAME_PREFIX + getId());
}
this.httpService = httpService;
this.serverConnection = serverConnection;
}
@Override
public void run() {
LOG.info("Processing new AS2 request");
final HttpContext context = new BasicHttpContext(null);
try {
while (!Thread.interrupted()) {
this.httpService.handleRequest(this.serverConnection, context);
// Send asynchronous MDN if any.
HttpCoreContext coreContext = HttpCoreContext.adapt(context);
String recipientAddress = coreContext.getAttribute(AS2AsynchronousMDNManager.RECIPIENT_ADDRESS,
String.class);
if (recipientAddress != null) {
DispositionNotificationMultipartReportEntity multipartReportEntity = coreContext.getAttribute(
AS2AsynchronousMDNManager.ASYNCHRONOUS_MDN,
DispositionNotificationMultipartReportEntity.class);
AS2AsynchronousMDNManager asynchronousMDNManager = new AS2AsynchronousMDNManager(as2Version,
originServer, serverFqdn, signingCertificateChain, signingPrivateKey);
asynchronousMDNManager.send(multipartReportEntity, recipientAddress);
}
}
} catch (final ConnectionClosedException ex) {
LOG.info("Client closed connection");
} catch (final IOException ex) {
LOG.error("I/O error: {}", ex.getMessage());
} catch (final HttpException ex) {
ex.printStackTrace();
LOG.error("Unrecoverable HTTP protocol violation: {}", ex.getMessage());
} finally {
try {
this.serverConnection.shutdown();
} catch (final IOException ignore) {
}
}
}
}
private RequestListenerThread listenerThread;
private String as2Version;
private String originServer;
private String serverFqdn;
private Integer serverPortNumber;
private AS2SignatureAlgorithm signingAlgorithm;
private Certificate[] signingCertificateChain;
private PrivateKey signingPrivateKey;
private PrivateKey decryptingPrivateKey;
public AS2ServerConnection(String as2Version,
String originServer,
String serverFqdn,
Integer serverPortNumber,
AS2SignatureAlgorithm signingAlgorithm,
Certificate[] signingCertificateChain,
PrivateKey signingPrivateKey,
PrivateKey decryptingPrivateKey)
throws IOException {
this.as2Version = Args.notNull(as2Version, "as2Version");
this.originServer = Args.notNull(originServer, "userAgent");
this.serverFqdn = Args.notNull(serverFqdn, "serverFqdn");
this.serverPortNumber = Args.notNull(serverPortNumber, "serverPortNumber");
this.signingAlgorithm = signingAlgorithm;
this.signingCertificateChain = signingCertificateChain;
this.signingPrivateKey = signingPrivateKey;
this.decryptingPrivateKey = decryptingPrivateKey;
listenerThread = new RequestListenerThread(this.as2Version, this.originServer, this.serverFqdn,
this.serverPortNumber, this.signingAlgorithm, this.signingCertificateChain, this.signingPrivateKey, this.decryptingPrivateKey);
listenerThread.setDaemon(true);
listenerThread.start();
}
public PrivateKey getSigningPrivateKey() {
return signingPrivateKey;
}
public PrivateKey getDecryptingPrivateKey() {
return decryptingPrivateKey;
}
public void close() {
if (listenerThread != null) {
synchronized (listenerThread) {
try {
listenerThread.serversocket.close();
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
} finally {
listenerThread = null;
}
}
}
}
public void listen(String requestUri, HttpRequestHandler handler) throws IOException {
if (listenerThread != null) {
synchronized (listenerThread) {
listenerThread.registerHandler(requestUri, handler);
}
}
}
public void stopListening(String requestUri) {
if (listenerThread != null) {
listenerThread.unregisterHandler(requestUri);
}
}
protected HttpProcessor initProtocolProcessor(String as2Version,
String originServer,
String serverFqdn,
int port,
AS2SignatureAlgorithm signatureAlgorithm,
Certificate[] signingCertificateChain,
PrivateKey signingPrivateKey,
PrivateKey decryptingPrivateKey) {
return HttpProcessorBuilder.create().add(new ResponseContent(true)).add(new ResponseServer(originServer))
.add(new ResponseDate()).add(new ResponseConnControl()).add(new ResponseMDN(as2Version, serverFqdn,
signatureAlgorithm, signingCertificateChain, signingPrivateKey, decryptingPrivateKey))
.build();
}
}