blob: 030760e3763834d67b72d812c718ed0758a67594 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.vinci.transport;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.vinci.debug.Debug;
import org.apache.vinci.transport.context.VinciContext;
/**
* Maintains a pool of connections to a given service and allows thread-safe querying of that
* service. This provides a set of sendAndReceive methods with signatures equivalent to those in
* VinciClient, but unlike VinciClient, the methods can be invoked concurrently by multiple threads.
*/
public class PooledVinciClient {
private VinciContext context = VinciContext.getGlobalContext();
private TransportableFactory factory = VinciFrame.getVinciFrameFactory();
int connectTimeout = BaseClient.DEFAULT_CONNECT_TIMEOUT;
int socketTimeout = BaseClient.DEFAULT_SOCKET_TIMEOUT;
private String serviceName;
private int maxPoolSize;
private VinciClient[] availableClients;
private int availableClientsStartIndex;
boolean closed;
/**
* Create a PooledVinciClient that will establish at most maxPoolSize connections to the
* designated service.
* @param serviceName -
* @param maxPoolSize -
*/
public PooledVinciClient(String serviceName, int maxPoolSize) {
this.serviceName = serviceName;
this.maxPoolSize = maxPoolSize;
this.availableClients = new VinciClient[maxPoolSize];
this.availableClientsStartIndex = 0;
this.closed = false;
}
/**
* Set a VinciContext that will be used by this PooledVinciClient instead of the default global
* context.
* @param context -
*/
public void setContext(VinciContext context) {
this.context = context;
}
/**
* Set a connect timeout that will be used in place of BaseClient.DEFAULT_CONNECT_TIMEOUT
* @param connectTimeoutMillis -
*/
public void setConnectTimeout(int connectTimeoutMillis) {
this.connectTimeout = connectTimeoutMillis;
}
/**
* Set a socket timeout that will be used in place of BaseClient.DEFAULT_SOCKET_TIMEOUT
* @param socketTimeoutMillis -
*/
public void setSocketTimeout(int socketTimeoutMillis) {
this.socketTimeout = socketTimeoutMillis;
}
/**
* Set a transportable factory that will be used in place of the VinciFrame factory.
* @param factory -
*/
public void setTransportableFactory(TransportableFactory factory) {
this.factory = factory;
}
/**
* Get the service name to which this client connects.
* @return -
*/
public String getServiceName() {
return serviceName;
}
/**
* Send a request to the service and receive the response. This method is tread safe.
* @param in -
* @return -
* @throws IOException -
* @throws ServiceException -
*/
public Transportable sendAndReceive(Transportable in) throws IOException, ServiceException {
VinciClient c = getClientFromPool();
try {
return c.sendAndReceive(in);
} finally {
releaseClient(c);
}
}
/**
* Send a request to the service and receive the response, using the provided transportable
* factory in place of the client-provided one. This method is tread safe.
* @param in -
* @param f -
* @return -
* @throws IOException -
* @throws ServiceException -
*/
public Transportable sendAndReceive(Transportable in, TransportableFactory f) throws IOException,
ServiceException {
VinciClient c = getClientFromPool();
try {
return c.sendAndReceive(in, f);
} finally {
releaseClient(c);
}
}
/**
* Send a request to the service and receive the response, using the provided transportable
* factory and socketTimeout in place of the client-provided ones. This method is tread safe.
* @param in -
* @param f -
* @param socketTimeout -
* @return -
* @throws IOException -
* @throws ServiceException -
*/
public Transportable sendAndReceive(Transportable in, TransportableFactory f, int socketTimeout)
throws IOException, ServiceException {
VinciClient c = getClientFromPool();
try {
return c.sendAndReceive(in, f, socketTimeout);
} finally {
releaseClient(c);
}
}
/**
* Send a request to the service and receive the response, using the provided socketTimeout in
* place of the client-provided one. This method is tread safe.
* @param in -
* @param socketTimeout -
* @return -
* @throws IOException -
* @throws ServiceException -
*/
public Transportable sendAndReceive(Transportable in, int socketTimeout) throws IOException,
ServiceException {
VinciClient c = getClientFromPool();
try {
return c.sendAndReceive(in, socketTimeout);
} finally {
releaseClient(c);
}
}
/**
* Close this pooled client. Blocked requests will return IOException, as will any requests
* following the invocation of this method. Once a pooled client is closed it cannot be reused.
*
* @param wait
* If true, this method will block until all in-progress requests have completed,
* otherwise this method will return immediately (though in progress requests will still
* be allowed to complete)
*/
public void close(boolean wait) {
ArrayList closeUs = new ArrayList();
synchronized (this) {
if (!closed) {
closed = true;
for (int i = availableClientsStartIndex; i < maxPoolSize; i++) {
if (availableClients[availableClientsStartIndex] != null) {
closeUs.add(availableClients[availableClientsStartIndex]);
availableClients[availableClientsStartIndex] = null;
}
}
this.notifyAll();
}
}
for (int i = 0; i < closeUs.size(); i++) {
((VinciClient) closeUs.get(i)).close();
}
if (wait) {
boolean wasInterrupted = false;
synchronized (this) {
while (availableClientsStartIndex > 0) {
try {
this.wait();
} catch (InterruptedException e) {
wasInterrupted = true;
}
}
}
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Retrieve a client from the pool, blocking for at most "socketTimeout" seconds. If the wait
* block time is exceeded, an IOException will be thrown. Connections obtained by this method
* *must* be released by calling releaseClient().
*
* @throws IOException
* if the wait() time before a connectoin becomes available exceeds the socketTimeout
* value, or if thrown by a failed service connection attempt.
*/
private VinciClient getClientFromPool() throws IOException {
VinciClient client;
synchronized (this) {
if (availableClientsStartIndex == maxPoolSize && !closed) {
try {
this.wait(socketTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("interrupted while waiting for available client");
}
if (availableClientsStartIndex == maxPoolSize) {
throw new IOException("waited too long for available client");
}
}
if (closed) {
throw new IOException("client is closed");
}
if (availableClients[availableClientsStartIndex] == null) {
Debug.p("Creating new client for pool: " + availableClientsStartIndex);
VinciClient c = new VinciClient(factory);
c.setConnectTimeout(connectTimeout);
c.setSocketTimeout(socketTimeout);
c.setContext(context);
availableClients[availableClientsStartIndex] = c;
}
client = availableClients[availableClientsStartIndex];
availableClientsStartIndex++;
} // synchronized this
try {
if (!client.isOpen()) {
client.open(serviceName);
}
VinciClient returnMe = client;
client = null;
return returnMe;
} finally {
if (client != null) {
releaseClient(client);
}
}
}
/**
* Release a client that has been obtained from getClientFromPool.
*/
synchronized private void releaseClient(VinciClient c) {
if (closed) {
--availableClientsStartIndex;
this.notify();
c.close();
} else {
availableClients[--availableClientsStartIndex] = c;
this.notify();
}
}
} // end class PooledVinciClient