| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.openjpa.event; |
| |
| import java.io.BufferedInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.OutputStream; |
| import java.net.InetAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.net.UnknownHostException; |
| import java.security.AccessController; |
| import java.security.PrivilegedActionException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.pool2.BasePooledObjectFactory; |
| import org.apache.commons.pool2.PooledObject; |
| import org.apache.commons.pool2.impl.DefaultPooledObject; |
| import org.apache.commons.pool2.impl.GenericObjectPool; |
| import org.apache.commons.pool2.impl.GenericObjectPoolConfig; |
| import org.apache.openjpa.lib.conf.Configurable; |
| import org.apache.openjpa.lib.log.Log; |
| import org.apache.openjpa.lib.util.J2DoPrivHelper; |
| import org.apache.openjpa.lib.util.Localizer; |
| import org.apache.openjpa.lib.util.StringUtil; |
| import org.apache.openjpa.util.GeneralException; |
| import org.apache.openjpa.util.InternalException; |
| import org.apache.openjpa.util.Serialization; |
| |
| |
| /** |
| * TCP-based implementation of {@link RemoteCommitProvider} that |
| * listens for object modifications and propagates those changes to |
| * other RemoteCommitProviders over TCP sockets. |
| * |
| * @author Brian Leair |
| * @author Patrick Linskey |
| * @since 0.2.5.0 |
| */ |
| public class TCPRemoteCommitProvider |
| extends AbstractRemoteCommitProvider |
| implements Configurable { |
| |
| private static final int DEFAULT_PORT = 5636; |
| |
| private static final Localizer s_loc = Localizer.forPackage |
| (TCPRemoteCommitProvider.class); |
| private static long s_idSequence = System.currentTimeMillis(); |
| |
| // A map of listen ports to listeners in this JVM. We might |
| // want to look into allowing same port, different interface -- |
| // that is not currently possible in a single JVM. |
| private static final Map<String, TCPPortListener> s_portListenerMap = new HashMap<>(); |
| |
| private long _id; |
| private byte[] _localhost; |
| private int _port = DEFAULT_PORT; |
| private int _maxTotal = 2; |
| private int _maxIdle = 2; |
| private int _recoveryTimeMillis = 15000; |
| private TCPPortListener _listener; |
| private BroadcastQueue _broadcastQueue = new BroadcastQueue(); |
| private final List<BroadcastWorkerThread> _broadcastThreads = Collections.synchronizedList( |
| new LinkedList<>()); |
| |
| private List<HostAddress> _addresses = new ArrayList<>(); |
| private ReentrantLock _addressesLock; |
| |
| public TCPRemoteCommitProvider() |
| throws UnknownHostException { |
| // obtain a unique ID. |
| synchronized (TCPRemoteCommitProvider.class) { |
| _id = s_idSequence++; |
| } |
| |
| // cache the local IP address. |
| _localhost = InetAddress.getLocalHost().getAddress(); |
| _addressesLock = new ReentrantLock(); |
| setNumBroadcastThreads(2); |
| } |
| |
| /** |
| * The port that this provider should listen on. |
| */ |
| public int getPort() { |
| return _port; |
| } |
| |
| /** |
| * The port that this provider should listen on. Set once only. |
| */ |
| public void setPort(int port) { |
| _port = port; |
| } |
| |
| /** |
| * The number of milliseconds to wait before retrying |
| * to reconnect to a peer after it becomes unreachable. |
| */ |
| public void setRecoveryTimeMillis(int recoverytime) { |
| _recoveryTimeMillis = recoverytime; |
| } |
| |
| /** |
| * The number of milliseconds to wait before retrying |
| * to reconnect to a peer after it becomes unreachable. |
| */ |
| public int getRecoveryTimeMillis() { |
| return _recoveryTimeMillis; |
| } |
| |
| /** |
| * The maximum number of sockets that this provider can |
| * simetaneously open to each peer in the cluster. |
| * |
| * @deprecated please use {@link TCPRemoteCommitProvider#setMaxTotal(int)} instead |
| */ |
| @Deprecated |
| public void setMaxActive(int maxActive) { |
| log.warn("This method should not be used"); |
| _maxTotal = maxActive; |
| } |
| |
| /** |
| * The maximum total number of sockets that this provider can |
| * simetaneously open to each peer in the cluster. |
| */ |
| public void setMaxTotal(int maxTotal) { |
| _maxTotal = maxTotal; |
| } |
| |
| /** |
| * The maximum number of sockets that this provider can |
| * simetaneously open to each peer in the cluster. |
| */ |
| public int getMaxTotal() { |
| return _maxTotal; |
| } |
| |
| /** |
| * The number of idle sockets that this provider can keep open |
| * to each peer in the cluster. |
| */ |
| public void setMaxIdle(int maxIdle) { |
| _maxIdle = maxIdle; |
| } |
| |
| /** |
| * The number of idle sockets that this provider can keep open |
| * to each peer in the cluster. |
| */ |
| public int getMaxIdle() { |
| return _maxIdle; |
| } |
| |
| /** |
| * The number of worker threads that are used for |
| * transmitting packets to peers in the cluster. |
| */ |
| public void setNumBroadcastThreads(int numBroadcastThreads) { |
| synchronized (_broadcastThreads) { |
| int cur = _broadcastThreads.size(); |
| if (cur > numBroadcastThreads) { |
| // Notify the extra worker threads so they stop themselves |
| // Threads will not end until they send another pk. |
| for (int i = numBroadcastThreads; i < cur; i++) { |
| BroadcastWorkerThread worker = _broadcastThreads.remove(0); |
| worker.setRunning(false); |
| } |
| } else if (cur < numBroadcastThreads) { |
| // Create additional worker threads |
| for (int i = cur; i < numBroadcastThreads; i++) { |
| BroadcastWorkerThread wt = new BroadcastWorkerThread(); |
| wt.setDaemon(true); |
| wt.start(); |
| _broadcastThreads.add(wt); |
| } |
| } |
| } |
| } |
| |
| /** |
| * The number of worker threads that are used for |
| * transmitting packets to peers in the cluster. |
| */ |
| public int getNumBroadcastThreads() { |
| return _broadcastThreads.size(); |
| } |
| |
| /** |
| * Sets the list of addresses of peers to which this provider will |
| * send events to. The peers are semicolon-separated <code>names</code> |
| * list in the form of "myhost1:portA;myhost2:portB". |
| */ |
| public void setAddresses(String names) |
| throws UnknownHostException { |
| // NYI. Could look for equivalence of addresses and avoid |
| // changing those that didn't change. |
| |
| _addressesLock.lock(); |
| try { |
| for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) { |
| iter.next().close(); |
| } |
| String[] toks = StringUtil.split(names, ";", 0); |
| _addresses = new ArrayList<>(toks.length); |
| |
| InetAddress localhost = InetAddress.getLocalHost(); |
| String localhostName = localhost.getHostName(); |
| |
| for (int i = 0; i < toks.length; i++) { |
| String host = toks[i]; |
| String hostname; |
| int tmpPort; |
| int colon = host.indexOf(':'); |
| if (colon != -1) { |
| hostname = host.substring(0, colon); |
| tmpPort = Integer.parseInt(host.substring(colon + 1)); |
| } else { |
| hostname = host; |
| tmpPort = DEFAULT_PORT; |
| } |
| InetAddress tmpAddress = AccessController |
| .doPrivileged(J2DoPrivHelper.getByNameAction(hostname)); |
| |
| // bleair: For each address we would rather make use of |
| // the jdk1.4 isLinkLocalAddress () || isLoopbackAddress (). |
| // (Though in practice on win32 they don't work anyways!) |
| // Instead we will check hostname. Not perfect, but |
| // it will match often enough (people will typically |
| // use the DNS machine names and be cutting/pasting.) |
| if (localhostName.equals(hostname)) { |
| // This string matches the hostname for for ourselves, we |
| // don't actually need to send ourselves messages. |
| if (log.isTraceEnabled()) { |
| log.trace(s_loc.get("tcp-address-asself", |
| tmpAddress.getHostName() + ":" + tmpPort)); |
| } |
| } else { |
| HostAddress newAddress = new HostAddress(host); |
| _addresses.add(newAddress); |
| if (log.isTraceEnabled()) { |
| log.trace(s_loc.get("tcp-address-set", |
| newAddress._address.getHostName() + ":" |
| + newAddress._port)); |
| } |
| } |
| } |
| } catch (PrivilegedActionException pae) { |
| throw (UnknownHostException) pae.getException(); |
| } finally { |
| _addressesLock.unlock(); |
| } |
| } |
| |
| // ---------- Configurable implementation ---------- |
| |
| /** |
| * Subclasses that need to perform actions in |
| * {@link Configurable#endConfiguration} must invoke this method. |
| */ |
| @Override |
| public void endConfiguration() { |
| super.endConfiguration(); |
| synchronized (s_portListenerMap) { |
| // see if a listener exists for this port. |
| _listener = s_portListenerMap.get |
| (String.valueOf(_port)); |
| |
| if (_listener == null || |
| (!_listener.isRunning() && _listener._port == _port)) { |
| try { |
| _listener = new TCPPortListener(_port, log); |
| _listener.listen(); |
| s_portListenerMap.put(String.valueOf(_port), _listener); |
| } catch (Exception e) { |
| throw new GeneralException(s_loc.get("tcp-init-exception", |
| String.valueOf(_port)), e).setFatal(true); |
| } |
| } else if (_listener.isRunning()) { |
| if (_listener._port != _port) { |
| // this really shouldn't be able to happen. |
| throw new GeneralException(s_loc.get |
| ("tcp-not-equal", String.valueOf(_port))). |
| setFatal(true); |
| } |
| } else { |
| throw new InternalException(s_loc.get("tcp-listener-broken")); |
| } |
| _listener.addProvider(this); |
| } |
| |
| _addressesLock.lock(); |
| try { |
| HostAddress curAddress; |
| for (Iterator<HostAddress> iter = _addresses.iterator(); |
| iter.hasNext();) { |
| curAddress = iter.next(); |
| curAddress.setMaxTotal(_maxTotal); |
| curAddress.setMaxIdle(_maxIdle); |
| } |
| } |
| finally { |
| _addressesLock.unlock(); |
| } |
| } |
| |
| // ---------- RemoteCommitProvider implementation ---------- |
| |
| // pre 3.3.4 = <no version number transmitted> |
| // 3.3 Preview = 0x1428acfd; |
| // 3.4 = 0x1428acff; |
| private static final long PROTOCOL_VERSION = 0x1428acff; |
| |
| @Override |
| public void broadcast(RemoteCommitEvent event) { |
| try { |
| // build a packet notifying other JVMs of object changes. |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| ObjectOutputStream oos = new ObjectOutputStream(baos); |
| |
| oos.writeLong(PROTOCOL_VERSION); |
| oos.writeLong(_id); |
| oos.writeInt(_port); |
| oos.writeObject(_localhost); |
| oos.writeObject(event); |
| oos.flush(); |
| |
| byte[] bytes = baos.toByteArray(); |
| baos.close(); |
| if (_broadcastThreads.isEmpty()) { |
| sendUpdatePacket(bytes); |
| } else { |
| _broadcastQueue.addPacket(bytes); |
| } |
| } catch (IOException ioe) { |
| if (log.isWarnEnabled()) { |
| log.warn(s_loc.get("tcp-payload-create-error"), ioe); |
| } |
| } |
| } |
| |
| /** |
| * Sends a change notification packet to other machines in this |
| * provider cluster. |
| */ |
| private void sendUpdatePacket(byte[] bytes) { |
| _addressesLock.lock(); |
| try { |
| for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) { |
| iter.next().sendUpdatePacket(bytes); |
| } |
| } finally { |
| _addressesLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void close() { |
| if (_listener != null) { |
| _listener.removeProvider(this); |
| } |
| |
| // Remove Broadcast Threads then close sockets. |
| _broadcastQueue.close(); |
| |
| // Wait for _broadcastThreads to get cleaned up. |
| while(!_broadcastThreads.isEmpty()) { |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException ie) { |
| // Ignore. |
| } |
| } |
| |
| _addressesLock.lock(); |
| try { |
| for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) { |
| iter.next().close(); |
| } |
| } finally { |
| _addressesLock.unlock(); |
| } |
| } |
| |
| /** |
| * Utility class to hold messages to be sent. This |
| * allows calls to broadcast () to return without |
| * waiting for the send to complete. |
| */ |
| private static class BroadcastQueue { |
| |
| private LinkedList<byte[]> _packetQueue = new LinkedList<>(); |
| private boolean _closed = false; |
| |
| public synchronized void close() { |
| _closed = true; |
| notifyAll(); |
| } |
| |
| public synchronized boolean isClosed() { |
| return _closed; |
| } |
| |
| public synchronized void addPacket(byte[] bytes) { |
| _packetQueue.addLast(bytes); |
| notify(); |
| } |
| |
| /** |
| * @return the bytes defining the packet to process, or |
| * <code>null</code> if the queue is empty. |
| */ |
| public synchronized byte[] removePacket() |
| throws InterruptedException { |
| // only wait if the queue is still open. This allows processing |
| // of events in the queue to continue, while avoiding sleeping |
| // during shutdown. |
| while (!_closed && _packetQueue.isEmpty()) { |
| wait(); |
| } |
| if (_packetQueue.isEmpty()) { |
| return null; |
| } else { |
| return _packetQueue.removeFirst(); |
| } |
| } |
| } |
| |
| /** |
| * Threads to broadcast packets placed in the {@link BroadcastQueue}. |
| */ |
| private class BroadcastWorkerThread |
| extends Thread { |
| |
| private boolean _keepRunning = true; |
| |
| @Override |
| public void run() { |
| while (_keepRunning) { |
| try { |
| // This will block until there is a packet to send, or |
| // until the queue is closed. |
| byte[] bytes = _broadcastQueue.removePacket(); |
| if (bytes != null) { |
| sendUpdatePacket(bytes); |
| } else if (_broadcastQueue.isClosed()) { |
| _keepRunning = false; |
| } |
| } catch (InterruptedException e) { |
| // End the thread. |
| break; |
| } |
| } |
| remove(); |
| } |
| |
| public void setRunning(boolean keepRunning) { |
| _keepRunning = keepRunning; |
| } |
| |
| private void remove() { |
| _broadcastThreads.remove(this); |
| } |
| } |
| |
| /** |
| * Responsible for listening for incoming packets and processing them. |
| */ |
| private static class TCPPortListener |
| implements Runnable { |
| |
| private final Log _log; |
| private ServerSocket _receiveSocket; |
| private Thread _acceptThread; |
| private Set<Thread> _receiverThreads = new HashSet<>(); |
| private final Set<TCPRemoteCommitProvider> _providers = new HashSet<>(); |
| |
| /** |
| * Cache the local IP address |
| */ |
| private byte[] _localhost; |
| |
| /** |
| * The port that this listener should listen on. Configured |
| * by TCPRemoteCommitProvider. |
| */ |
| private int _port; |
| |
| /** |
| * Should be set to <code>true</code> once the listener is listening. |
| */ |
| private boolean _isRunning = false; |
| |
| /** |
| * Construct a new TCPPortListener configured to use the specified port. |
| */ |
| private TCPPortListener(int port, Log log) |
| throws IOException { |
| _port = port; |
| _log = log; |
| try { |
| _receiveSocket = AccessController |
| .doPrivileged(J2DoPrivHelper.newServerSocketAction(_port)); |
| } catch (PrivilegedActionException pae) { |
| throw (IOException) pae.getException(); |
| } |
| _localhost = InetAddress.getLocalHost().getAddress(); |
| |
| if (_log.isTraceEnabled()) { |
| _log.info(s_loc.get("tcp-start-listener", |
| String.valueOf(_port))); |
| } |
| } |
| |
| private void listen() { |
| _acceptThread = new Thread(this); |
| _acceptThread.setDaemon(true); |
| _acceptThread.start(); |
| } |
| |
| /** |
| * All providers added here will be notified of any incoming |
| * provider messages. There will be one of these per |
| * BrokerFactory in a given JVM. |
| * {@link TCPRemoteCommitProvider#endConfiguration} invokes |
| * <code>addProvider</code> with <code>this</code> upon |
| * completion of configuration. |
| */ |
| private void addProvider(TCPRemoteCommitProvider provider) { |
| synchronized (_providers) { |
| _providers.add(provider); |
| } |
| } |
| |
| /** |
| * Remove a provider from the list of providers to notify of |
| * commit events. |
| */ |
| private synchronized void removeProvider |
| (TCPRemoteCommitProvider provider) { |
| synchronized (_providers) { |
| _providers.remove(provider); |
| |
| // if the provider list is empty, shut down the thread. |
| if (_providers.size() == 0) { |
| _isRunning = false; |
| try { |
| _receiveSocket.close(); |
| } catch (IOException ioe) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-close-error"), ioe); |
| } |
| } |
| _acceptThread.interrupt(); |
| } |
| } |
| } |
| |
| private boolean isRunning() { |
| synchronized (_providers) { |
| return _isRunning; |
| } |
| } |
| |
| @Override |
| public void run() { |
| synchronized (_providers) { |
| _isRunning = true; |
| } |
| |
| Socket s = null; |
| while (_isRunning) { |
| try { |
| s = null; |
| // Block, waiting to accept new connection from a peer |
| s = AccessController.doPrivileged(J2DoPrivHelper |
| .acceptAction(_receiveSocket)); |
| if (_log.isTraceEnabled()) { |
| _log.trace(s_loc.get("tcp-received-connection", |
| s.getInetAddress().getHostAddress() |
| + ":" + s.getPort())); |
| } |
| ReceiveSocketHandler sh = new ReceiveSocketHandler(s); |
| Thread receiverThread = new Thread(sh); |
| receiverThread.setDaemon(true); |
| receiverThread.start(); |
| _receiverThreads.add(receiverThread); |
| } catch (Exception e) { |
| if (e instanceof PrivilegedActionException) { |
| e = ((PrivilegedActionException) e).getException(); |
| } |
| if (!(e instanceof SocketException) || _isRunning) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-accept-error"), e); |
| } |
| } |
| |
| // Nominal case (InterruptedException) because close () |
| // calls _acceptThread.interrupt (); |
| try { |
| if (s != null) { |
| s.close(); |
| } |
| } catch (Exception ee) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-close-error"), e); |
| } |
| } |
| } |
| } |
| |
| // We are done listening. Interrupt any worker threads. |
| Thread worker; |
| for (Iterator<Thread> iter = _receiverThreads.iterator(); |
| iter.hasNext();) { |
| worker = iter.next(); |
| // FYI, the worker threads are blocked |
| // reading from the socket's InputStream. InputStreams |
| // aren't interruptable, so this interrupt isn't |
| // really going to be delivered until something breaks |
| // the InputStream. |
| worker.interrupt(); |
| } |
| synchronized (_providers) { |
| try { |
| if (_isRunning) { |
| _receiveSocket.close(); |
| } |
| } catch (Exception e) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-close-error"), e); |
| } |
| } |
| _isRunning = false; |
| if (_log.isTraceEnabled()) { |
| _log.trace(s_loc.get("tcp-close-listener", |
| _port + "")); |
| } |
| } |
| } |
| |
| /** |
| * Utility class that acts as a worker thread to receive Events |
| * from broadcasters. |
| */ |
| private class ReceiveSocketHandler |
| implements Runnable { |
| |
| private InputStream _in; |
| private Socket _s; |
| |
| private ReceiveSocketHandler(Socket s) { |
| // We are the receiving end and we don't send any messages |
| // back to the broadcaster. Turn off Nagle's so that |
| // we will send ack packets without waiting. |
| _s = s; |
| try { |
| _s.setTcpNoDelay(true); |
| _in = new BufferedInputStream(s.getInputStream()); |
| } catch (IOException ioe) { |
| if (_log.isInfoEnabled()) { |
| _log.info(s_loc.get("tcp-socket-option-error"), ioe); |
| } |
| _s = null; |
| } catch (Exception e) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-receive-error"), e); |
| } |
| _s = null; |
| } |
| } |
| |
| @Override |
| public void run() { |
| if (_s == null) { |
| return; |
| } |
| while (_isRunning && _s != null) { |
| try { |
| // This will block our thread, waiting to read |
| // the next Event-object-message. |
| handle(_in); |
| } catch (EOFException eof) { |
| // EOFException raised when peer is properly |
| // closing its end. |
| if (_log.isTraceEnabled()) { |
| _log.trace(s_loc.get("tcp-close-socket", |
| _s.getInetAddress().getHostAddress() |
| + ":" + _s.getPort())); |
| } |
| break; |
| } catch (Throwable e) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-receive-error"), e); |
| } |
| break; |
| } |
| } |
| // We are done receiving on this socket and this worker |
| // thread is terminating. |
| try { |
| _in.close(); |
| if (_s != null) { |
| _s.close(); |
| } |
| } catch (IOException e) { |
| _log.warn(s_loc.get("tcp-close-socket-error", |
| _s.getInetAddress().getHostAddress() + ":" |
| + _s.getPort()), e); |
| } |
| } |
| |
| /** |
| * Process an {@link InputStream} containing objects written |
| * by {@link TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}. |
| */ |
| private void handle(InputStream in) |
| throws IOException, ClassNotFoundException { |
| // This will block waiting for the next |
| ObjectInputStream ois = |
| new Serialization.ClassResolvingObjectInputStream(in); |
| |
| long protocolVersion = ois.readLong(); |
| if (protocolVersion != PROTOCOL_VERSION) { |
| if (_log.isWarnEnabled()) { |
| _log.warn(s_loc.get("tcp-wrong-version-error", |
| _s.getInetAddress().getHostAddress() + ":" |
| + _s.getPort())); |
| return; |
| } |
| } |
| |
| long senderId = ois.readLong(); |
| int senderPort = ois.readInt(); |
| byte[] senderAddress = (byte[]) ois.readObject(); |
| RemoteCommitEvent rce = (RemoteCommitEvent) ois.readObject(); |
| if (_log.isTraceEnabled()) { |
| _log.trace(s_loc.get("tcp-received-event", |
| _s.getInetAddress().getHostAddress() + ":" |
| + _s.getPort())); |
| } |
| |
| boolean fromSelf = senderPort == _port && |
| Arrays.equals(senderAddress, _localhost); |
| TCPRemoteCommitProvider provider; |
| synchronized (_providers) { |
| // bleair: We're iterating, but currenlty there can really |
| // only be a single provider. |
| for (Iterator<TCPRemoteCommitProvider> iter = _providers.iterator(); |
| iter.hasNext();) { |
| provider = iter.next(); |
| if (senderId != provider._id || !fromSelf) { |
| provider.eventManager.fireEvent(rce); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Utility class to store an InetAddress and an int. Not using |
| * InetSocketAddress because it's a JDK1.4 API. This also |
| * provides a wrapper around the socket(s) associated with this address. |
| */ |
| private class HostAddress { |
| |
| private InetAddress _address; |
| private int _port; |
| private long _timeLastError; // millis |
| private boolean _isAvailable; // is peer thought to be up |
| private int _infosIssued = 0; // limit log entries |
| |
| private GenericObjectPool<Socket> _socketPool; // reusable open sockets |
| |
| /** |
| * Construct a new host address from a string of the form |
| * "host:port" or of the form "host". |
| */ |
| private HostAddress(String host) |
| throws UnknownHostException { |
| int colon = host.indexOf(':'); |
| try { |
| if (colon != -1) { |
| _address = AccessController |
| .doPrivileged(J2DoPrivHelper.getByNameAction(host |
| .substring(0, colon))); |
| _port = Integer.parseInt(host.substring(colon + 1)); |
| } else { |
| _address = AccessController |
| .doPrivileged(J2DoPrivHelper.getByNameAction(host)); |
| _port = DEFAULT_PORT; |
| } |
| } catch (PrivilegedActionException pae) { |
| throw (UnknownHostException) pae.getException(); |
| } |
| GenericObjectPoolConfig<Socket> cfg = new GenericObjectPoolConfig<>(); |
| cfg.setMaxTotal(_maxTotal); |
| cfg.setBlockWhenExhausted(true); |
| cfg.setMaxWaitMillis(-1L); |
| // -1 max wait == as long as it takes |
| _socketPool = new GenericObjectPool<>(new SocketPoolableObjectFactory(), cfg); |
| _isAvailable = true; |
| } |
| |
| private void setMaxTotal(int maxTotal) { |
| _socketPool.setMaxTotal(maxTotal); |
| } |
| |
| private void setMaxIdle(int maxIdle) { |
| _socketPool.setMaxIdle(maxIdle); |
| } |
| |
| public void close() { |
| // Close the pool of sockets to this peer. This |
| // will close all sockets in the pool. |
| try { |
| _socketPool.close(); |
| } catch (Exception e) { |
| if (log.isWarnEnabled()) { |
| log.warn(s_loc.get("tcp-close-pool-error"), e); |
| } |
| } |
| } |
| |
| private void sendUpdatePacket(byte[] bytes) { |
| if (!_isAvailable) { |
| long now = System.currentTimeMillis(); |
| if (now - _timeLastError < _recoveryTimeMillis) { |
| // Not enough time has passed since the last error |
| return; |
| } |
| } |
| Socket s = null; |
| try { |
| s = getSocket(); |
| OutputStream os = s.getOutputStream(); |
| os.write(bytes); |
| os.flush(); |
| |
| if (log.isTraceEnabled()) { |
| log.trace(s_loc.get("tcp-sent-update", |
| _address.getHostAddress() + ":" + _port, |
| String.valueOf(s.getLocalPort()))); |
| } |
| _isAvailable = true; |
| _infosIssued = 0; |
| // Return the socket to the pool; the socket is |
| // still good. |
| returnSocket(s); |
| } catch (Exception e) { |
| // There has been a problem sending to the peer. |
| // The OS socket that was being used is can no longer |
| // be used. |
| if (s != null) { |
| this.closeSocket(s); |
| } |
| this.clearAllSockets(); |
| |
| if (_isAvailable) { |
| // Log a warning, the peer was up and has now gone down |
| if (log.isWarnEnabled()) { |
| log.warn(s_loc.get("tcp-send-error", |
| _address.getHostAddress() + ":" + _port), e); |
| } |
| _isAvailable = false; |
| // Once enough time has passed we will log another warning |
| _timeLastError = System.currentTimeMillis(); |
| } else { |
| long now = System.currentTimeMillis(); |
| if (now - _timeLastError > _recoveryTimeMillis) { |
| if (_infosIssued < 5) { |
| // Enough time has passed, and peer is still down |
| _timeLastError = System.currentTimeMillis(); |
| // We were trying to reestablish the connection, |
| // but we failed again. Log a message, but |
| // lower severity. This log will occur periodically |
| // for 5 times until the peer comes back. |
| if (log.isInfoEnabled()) { |
| log.info(s_loc.get("tcp-send-still-error", |
| _address.getHostAddress() + ":" |
| + _port), e); |
| } |
| _infosIssued++; |
| } |
| } |
| } |
| } |
| } |
| |
| private Socket getSocket() |
| throws Exception { |
| return _socketPool.borrowObject(); |
| } |
| |
| private void returnSocket(Socket s) |
| throws Exception { |
| _socketPool.returnObject(s); |
| } |
| |
| private void clearAllSockets() { |
| _socketPool.clear(); |
| } |
| |
| private void closeSocket(Socket s) { |
| // All sockets come from the pool. |
| // This socket is no longer usable, so delete it from the |
| // pool. |
| try { |
| _socketPool.invalidateObject(s); |
| } catch (Exception e) { |
| } |
| } |
| |
| /** |
| * Factory for pooled sockets. |
| */ |
| private class SocketPoolableObjectFactory extends BasePooledObjectFactory<Socket> { |
| @Override |
| public Socket create() throws Exception { |
| try { |
| Socket s = AccessController |
| .doPrivileged(J2DoPrivHelper.newSocketAction(_address, |
| _port)); |
| if (log.isTraceEnabled()) { |
| log.trace(s_loc.get("tcp-open-connection", _address |
| + ":" + _port, "" + s.getLocalPort())); |
| } |
| return s; |
| } catch (PrivilegedActionException pae) { |
| throw (IOException) pae.getException(); |
| } |
| } |
| |
| @Override |
| public PooledObject<Socket> wrap(Socket obj) { |
| return new DefaultPooledObject<>(obj); |
| } |
| |
| @Override |
| public void destroyObject(PooledObject<Socket> p) throws Exception { |
| try { |
| Socket s = p.getObject(); |
| if (log.isTraceEnabled()) { |
| log.trace(s_loc.get("tcp-close-sending-socket", |
| _address + ":" + _port, "" + s.getLocalPort())); |
| } |
| s.close(); |
| } catch (Exception e) { |
| log.warn(s_loc.get("tcp-close-socket-error", |
| _address.getHostAddress() + ":" + _port), e); |
| } |
| } |
| } |
| } |
| } |