blob: 8c943b83493c5a584ee0b86dc903674613fe2114 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.mina.transport.socket.apr;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.polling.AbstractPollingIoConnector;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoProcessor;
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.SimpleIoProcessorPool;
import org.apache.mina.core.service.TransportMetadata;
import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
import org.apache.mina.transport.socket.SocketConnector;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.tomcat.jni.Address;
import org.apache.tomcat.jni.Poll;
import org.apache.tomcat.jni.Pool;
import org.apache.tomcat.jni.Socket;
import org.apache.tomcat.jni.Status;
/**
* {@link IoConnector} for APR based socket transport (TCP/IP).
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
/**
* This constant is deduced from the APR code. It is used when the timeout
* has expired while doing a poll() operation.
*/
private static final int APR_TIMEUP_ERROR = -120001;
private static final int POLLSET_SIZE = 1024;
private final Map<Long, ConnectionRequest> requests = new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
private final Object wakeupLock = new Object();
private volatile long wakeupSocket;
private volatile boolean toBeWakenUp;
private volatile long pool;
private volatile long pollset; // socket poller
private final long[] polledSockets = new long[POLLSET_SIZE << 1];
private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
private volatile ByteBuffer dummyBuffer;
/**
* Create an {@link AprSocketConnector} with default configuration (multiple thread model).
*/
public AprSocketConnector() {
super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
/**
* Constructor for {@link AprSocketConnector} with default configuration, and
* given number of {@link AprIoProcessor} for multithreading I/O operations
* @param processorCount the number of processor to create and place in a
* {@link SimpleIoProcessorPool}
*/
public AprSocketConnector(int processorCount) {
super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
/**
* Constructor for {@link AprSocketConnector} with default configuration but a
* specific {@link IoProcessor}, useful for sharing the same processor over multiple
* {@link IoService} of the same type.
* @param processor the processor to use for managing I/O events
*/
public AprSocketConnector(IoProcessor<AprSession> processor) {
super(new DefaultSocketSessionConfig(), processor);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
/**
* Constructor for {@link AprSocketConnector} with a given {@link Executor} for handling
* connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing
* the same processor and executor over multiple {@link IoService} of the same type.
* @param executor the executor for connection
* @param processor the processor for I/O operations
*/
public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
super(new DefaultSocketSessionConfig(), executor, processor);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
/**
* {@inheritDoc}
*/
@Override
protected void init() throws Exception {
// initialize a memory pool for APR functions
pool = Pool.create(AprLibrary.getInstance().getRootPool());
wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
dummyBuffer = Pool.alloc(pool, 1);
pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
if (pollset <= 0) {
pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
}
if (pollset <= 0) {
if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
}
}
}
/**
* {@inheritDoc}
*/
@Override
protected void destroy() throws Exception {
if (wakeupSocket > 0) {
Socket.close(wakeupSocket);
}
if (pollset > 0) {
Poll.destroy(pollset);
}
if (pool > 0) {
Pool.destroy(pool);
}
}
/**
* {@inheritDoc}
*/
@Override
protected Iterator<Long> allHandles() {
return polledHandles.iterator();
}
/**
* {@inheritDoc}
*/
@Override
protected boolean connect(Long handle, SocketAddress remoteAddress) throws Exception {
InetSocketAddress ra = (InetSocketAddress) remoteAddress;
long sa;
if (ra != null) {
if (ra.getAddress() == null) {
sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
} else {
sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
}
} else {
sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
}
int rv = Socket.connect(handle, sa);
if (rv == Status.APR_SUCCESS) {
return true;
}
if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
return false;
}
throwException(rv);
throw new InternalError(); // This sentence will never be executed.
}
/**
* {@inheritDoc}
*/
@Override
protected ConnectionRequest getConnectionRequest(Long handle) {
return requests.get(handle);
}
/**
* {@inheritDoc}
*/
@Override
protected void close(Long handle) throws Exception {
finishConnect(handle);
int rv = Socket.close(handle);
if (rv != Status.APR_SUCCESS) {
throwException(rv);
}
}
/**
* {@inheritDoc}
*/
@Override
protected boolean finishConnect(Long handle) throws Exception {
Poll.remove(pollset, handle);
requests.remove(handle);
if (failedHandles.remove(handle)) {
int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
throwException(rv);
throw new InternalError("Shouldn't reach here.");
}
return true;
}
/**
* {@inheritDoc}
*/
@Override
protected Long newHandle(SocketAddress localAddress) throws Exception {
long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
boolean success = false;
try {
int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
if (result != Status.APR_SUCCESS) {
throwException(result);
}
result = Socket.timeoutSet(handle, 0);
if (result != Status.APR_SUCCESS) {
throwException(result);
}
if (localAddress != null) {
InetSocketAddress la = (InetSocketAddress) localAddress;
long sa;
if (la.getAddress() == null) {
sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
} else {
sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
}
result = Socket.bind(handle, sa);
if (result != Status.APR_SUCCESS) {
throwException(result);
}
}
success = true;
return handle;
} finally {
if (!success) {
int rv = Socket.close(handle);
if (rv != Status.APR_SUCCESS) {
throwException(rv);
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
protected AprSession newSession(IoProcessor<AprSession> processor, Long handle) throws Exception {
return new AprSocketSession(this, processor, handle);
}
/**
* {@inheritDoc}
*/
@Override
protected void register(Long handle, ConnectionRequest request) throws Exception {
int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
if (rv != Status.APR_SUCCESS) {
throwException(rv);
}
requests.put(handle, request);
}
/**
* {@inheritDoc}
*/
@Override
protected int select(int timeout) throws Exception {
int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
if (rv <= 0) {
if (rv != APR_TIMEUP_ERROR) {
throwException(rv);
}
rv = Poll.maintain(pollset, polledSockets, true);
if (rv > 0) {
for (int i = 0; i < rv; i++) {
Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
}
} else if (rv < 0) {
throwException(rv);
}
return 0;
} else {
rv <<= 1;
if (!polledHandles.isEmpty()) {
polledHandles.clear();
}
for (int i = 0; i < rv; i++) {
long flag = polledSockets[i];
long socket = polledSockets[++i];
if (socket == wakeupSocket) {
synchronized (wakeupLock) {
Poll.remove(pollset, wakeupSocket);
toBeWakenUp = false;
}
continue;
}
polledHandles.add(socket);
if ((flag & Poll.APR_POLLOUT) == 0) {
failedHandles.add(socket);
}
}
return polledHandles.size();
}
}
/**
* {@inheritDoc}
*/
@Override
protected Iterator<Long> selectedHandles() {
return polledHandles.iterator();
}
/**
* {@inheritDoc}
*/
@Override
protected void wakeup() {
if (toBeWakenUp) {
return;
}
// Add a dummy socket to the pollset.
synchronized (wakeupLock) {
toBeWakenUp = true;
Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
}
}
/**
* {@inheritDoc}
*/
public TransportMetadata getTransportMetadata() {
return AprSocketSession.METADATA;
}
/**
* {@inheritDoc}
*/
public SocketSessionConfig getSessionConfig() {
return (SocketSessionConfig) sessionConfig;
}
/**
* {@inheritDoc}
*/
@Override
public InetSocketAddress getDefaultRemoteAddress() {
return (InetSocketAddress) super.getDefaultRemoteAddress();
}
/**
* {@inheritDoc}
*/
public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
super.setDefaultRemoteAddress(defaultRemoteAddress);
}
/**
* transform an APR error number in a more fancy exception
* @param code APR error code
* @throws IOException the produced exception for the given APR error number
*/
private void throwException(int code) throws IOException {
throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
}
}