blob: b9fd1de75c5fdade5b8625190f033fd09e407462 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.reactor.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.RecordImpl;
import org.apache.qpid.proton.reactor.Acceptor;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.impl.ReactorImpl;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
@SuppressWarnings("deprecation")
public class AcceptorImpl implements Acceptor {
private Record attachments = new RecordImpl();
private final SelectableImpl sel;
protected static final String CONNECTION_ACCEPTOR_KEY = "pn_reactor_connection_acceptor";
private class AcceptorReadable implements Callback {
@Override
public void run(Selectable selectable) {
Reactor reactor = selectable.getReactor();
try {
SocketChannel socketChannel = ((ServerSocketChannel)selectable.getChannel()).accept();
if (socketChannel == null) {
throw new ReactorInternalException("Selectable readable, but no socket to accept");
}
Handler handler = BaseHandler.getHandler(AcceptorImpl.this);
if (handler == null) {
handler = reactor.getHandler();
}
Connection conn = reactor.connection(handler);
Record conn_recs = conn.attachments();
conn_recs.set(CONNECTION_ACCEPTOR_KEY, Acceptor.class, AcceptorImpl.this);
InetSocketAddress peerAddr = (InetSocketAddress)socketChannel.getRemoteAddress();
if (peerAddr != null) {
Address addr = new Address();
addr.setHost(peerAddr.getHostString());
addr.setPort(Integer.toString(peerAddr.getPort()));
conn_recs.set(ReactorImpl.CONNECTION_PEER_ADDRESS_KEY, Address.class, addr);
}
Transport trans = Proton.transport();
Sasl sasl = trans.sasl();
sasl.server();
sasl.setMechanisms("ANONYMOUS");
sasl.done(SaslOutcome.PN_SASL_OK);
trans.bind(conn);
IOHandler.selectableTransport(reactor, socketChannel.socket(), trans);
} catch(IOException ioException) {
sel.error();
}
}
}
private static class AcceptorFree implements Callback {
@Override
public void run(Selectable selectable) {
try {
if (selectable.getChannel() != null) {
selectable.getChannel().close();
}
} catch(IOException ioException) {
// Ignore - as we can't make the channel any more closed...
}
}
}
protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException {
ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel();
ssc.bind(new InetSocketAddress(host, port));
sel = ((ReactorImpl)reactor).selectable(this);
sel.setChannel(ssc);
sel.onReadable(new AcceptorReadable());
sel.onFree(new AcceptorFree());
sel.setReactor(reactor);
BaseHandler.setHandler(this, handler);
sel.setReading(true);
reactor.update(sel);
}
@Override
public void close() {
if (!sel.isTerminal()) {
Reactor reactor = sel.getReactor();
try {
sel.getChannel().close();
} catch(IOException ioException) {
// Ignore.
}
sel.setChannel(null);
sel.terminate();
reactor.update(sel);
}
}
// Used for unit tests, where acceptor is bound to an ephemeral port
public int getPortNumber() throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel();
return ((InetSocketAddress)ssc.getLocalAddress()).getPort();
}
@Override
public void free() {
sel.free();
}
@Override
public Record attachments() {
return attachments;
}
}