[OPENJPA-2801] Introducing KubernetesTCPRemoteCommitProvider
diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/event/DynamicTCPRemoteCommitProvider.java b/openjpa-kernel/src/main/java/org/apache/openjpa/event/DynamicTCPRemoteCommitProvider.java
new file mode 100644
index 0000000..bb482d0
--- /dev/null
+++ b/openjpa-kernel/src/main/java/org/apache/openjpa/event/DynamicTCPRemoteCommitProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.event;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+import org.apache.openjpa.lib.util.J2DoPrivHelper;
+
+public abstract class DynamicTCPRemoteCommitProvider extends TCPRemoteCommitProvider {
+
+    private int _cacheDurationMillis = 30000;
+
+    public DynamicTCPRemoteCommitProvider() throws UnknownHostException {
+        super();
+    }
+
+    public int getCacheDurationMillis() {
+        return _cacheDurationMillis;
+    }
+
+    public void setCacheDurationMillis(final int _cacheDurationMillis) {
+        this._cacheDurationMillis = _cacheDurationMillis;
+    }
+
+    @Override
+    public final void setAddresses(final String names) throws UnknownHostException {
+        throw new UnknownHostException("Do not set Addresses on this instance; "
+                + "did you expect " + TCPRemoteCommitProvider.class.getSimpleName() + " ?");
+    }
+
+    @Override
+    public void endConfiguration() {
+        TcpAddressesUpdater updater = new TcpAddressesUpdater();
+        updater.run();
+
+        Timer timer = new Timer(true);
+        timer.scheduleAtFixedRate(updater, 0, _cacheDurationMillis);
+
+        super.endConfiguration();
+    }
+
+    protected abstract List<String> fetchDynamicAddresses();
+
+    private class TcpAddressesUpdater extends TimerTask {
+
+        @Override
+        public void run() {
+            List<String> dynamicAddresses = fetchDynamicAddresses();
+
+            _addressesLock.lock();
+            try {
+                String localhostAddress = InetAddress.getLocalHost().getHostAddress();
+
+                for (String dynamic : dynamicAddresses) {
+                    InetAddress tmpAddress = AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(dynamic));
+
+                    if (localhostAddress.equals(dynamic)) {
+                        // This string matches the hostname for for ourselves, we
+                        // don't actually need to send ourselves messages.
+                        if (log.isTraceEnabled()) {
+                            log.trace(s_loc.get("tcp-address-asself", tmpAddress.getHostAddress() + ":" + _port));
+                        }
+                    } else {
+                        HostAddress podAddress = new HostAddress(dynamic);
+                        if (_addresses.contains(podAddress)) {
+                            if (log.isTraceEnabled()) {
+                                log.trace(s_loc.get("dyntcp-address-not-set",
+                                        podAddress.getAddress().getHostAddress() + ":" + podAddress.getPort()));
+                            }
+                        } else {
+                            _addresses.add(podAddress);
+
+                            if (log.isTraceEnabled()) {
+                                log.trace(s_loc.get("dyntcp-address-set",
+                                        podAddress.getAddress().getHostAddress() + ":" + podAddress.getPort()));
+                            }
+                        }
+                    }
+                }
+
+                List<HostAddress> toCloseAndRemove = _addresses.stream().
+                        filter(address -> !dynamicAddresses.contains(address.getAddress().getHostAddress())).
+                        collect(Collectors.toList());
+                toCloseAndRemove.forEach(address -> {
+                    address.close();
+                    _addresses.remove(address);
+
+                    if (log.isTraceEnabled()) {
+                        log.trace(s_loc.get("tcp-address-unset",
+                                address.getAddress().getHostAddress() + ":" + address.getPort()));
+                    }
+                });
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error(s_loc.get("dyntcp-updater-error"), e);
+                }
+            } finally {
+                _addressesLock.unlock();
+            }
+        }
+    }
+}
diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java b/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
index b7f8b9f..ab4793c 100644
--- a/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
+++ b/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
@@ -38,10 +38,10 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -75,8 +75,7 @@
 
     private static final int DEFAULT_PORT = 5636;
 
-    private static final Localizer s_loc = Localizer.forPackage
-        (TCPRemoteCommitProvider.class);
+    protected static final Localizer s_loc = Localizer.forPackage(TCPRemoteCommitProvider.class);
     private static long s_idSequence = System.currentTimeMillis();
 
     //	A map of listen ports to listeners in this JVM. We might
@@ -84,22 +83,20 @@
     //	that is not currently possible in a single JVM.
     private static final Map<String, TCPPortListener> s_portListenerMap = new HashMap<>();
 
-    private long _id;
-    private byte[] _localhost;
-    private int _port = DEFAULT_PORT;
+    private final long _id;
+    private final byte[] _localhost;
+    protected int _port = DEFAULT_PORT;
     private int _maxTotal = 2;
     private int _maxIdle = 2;
     private int _recoveryTimeMillis = 15000;
     private TCPPortListener _listener;
-    private BroadcastQueue _broadcastQueue = new BroadcastQueue();
-    private final List<BroadcastWorkerThread> _broadcastThreads = Collections.synchronizedList(
-        new LinkedList<>());
+    private final BroadcastQueue _broadcastQueue = new BroadcastQueue();
+    private final List<BroadcastWorkerThread> _broadcastThreads = Collections.synchronizedList(new LinkedList<>());
 
-    private List<HostAddress> _addresses = new ArrayList<>();
-    private ReentrantLock _addressesLock;
+    protected List<HostAddress> _addresses = new ArrayList<>();
+    protected final ReentrantLock _addressesLock;
 
-    public TCPRemoteCommitProvider()
-        throws UnknownHostException {
+    public TCPRemoteCommitProvider() throws UnknownHostException {
         // obtain a unique ID.
         synchronized (TCPRemoteCommitProvider.class) {
             _id = s_idSequence++;
@@ -112,84 +109,90 @@
     }
 
     /**
-     * The port that this provider should listen on.
+     * @return the port that this provider should listen on.
      */
     public int getPort() {
         return _port;
     }
 
     /**
-     * The port that this provider should listen on. Set once only.
+     * Set the port that this provider should listen on. Set once only.
+     *
+     * @param port the port that this provider should listen on
      */
-    public void setPort(int port) {
+    public void setPort(final int port) {
         _port = port;
     }
 
     /**
-     * The number of milliseconds to wait before retrying
-     * to reconnect to a peer after it becomes unreachable.
+     * Set the number of milliseconds to wait before retrying to reconnect to a peer after it becomes unreachable.
+     * 
+     * @param recoverytime the number of milliseconds to wait before retrying to reconnect to a peer after it becomes
+     * unreachable
      */
-    public void setRecoveryTimeMillis(int recoverytime) {
+    public void setRecoveryTimeMillis(final int recoverytime) {
         _recoveryTimeMillis = recoverytime;
     }
 
     /**
-     * The number of milliseconds to wait before retrying
-     * to reconnect to a peer after it becomes unreachable.
+     * @return the number of milliseconds to wait before retrying to reconnect to a peer after it becomes unreachable.
      */
     public int getRecoveryTimeMillis() {
         return _recoveryTimeMillis;
     }
 
     /**
-     * The maximum number of sockets that this provider can
-     * simetaneously open to each peer in the cluster.
+     * Set the maximum number of sockets that this provider can simultaneously open to each peer in the cluster.
      *
-     * @deprecated please use {@link TCPRemoteCommitProvider#setMaxTotal(int)} instead
+     * @param maxActive the maximum total number of sockets that this provider can simultaneously open to each peer in
+     * the cluster.     * @deprecated please use {@link TCPRemoteCommitProvider#setMaxTotal(int)} instead
      */
     @Deprecated
-    public void setMaxActive(int maxActive) {
+    public void setMaxActive(final int maxActive) {
         log.warn("This method should not be used");
         _maxTotal = maxActive;
     }
 
     /**
-     * The maximum total number of sockets that this provider can
-     * simetaneously open to each peer in the cluster.
+     * Set the maximum total number of sockets that this provider can simultaneously open to each peer in the cluster.
+     * 
+     * @param maxTotal the maximum total number of sockets that this provider can simultaneously open to each peer in
+     * the cluster.
      */
-    public void setMaxTotal(int maxTotal) {
+    public void setMaxTotal(final int maxTotal) {
         _maxTotal = maxTotal;
     }
 
     /**
-     * The maximum number of sockets that this provider can
-     * simetaneously open to each peer in the cluster.
+     * @return the maximum number of sockets that this provider can simultaneously open to each peer in the cluster.
      */
     public int getMaxTotal() {
         return _maxTotal;
     }
 
     /**
-     * The number of idle sockets that this provider can keep open
-     * to each peer in the cluster.
+     * Set the number of idle sockets that this provider can keep open to each peer in the cluster.
+     * 
+     * @param maxIdle the number of idle sockets that this provider can keep open to each peer in the cluster
      */
-    public void setMaxIdle(int maxIdle) {
+    public void setMaxIdle(final int maxIdle) {
         _maxIdle = maxIdle;
     }
 
     /**
-     * The number of idle sockets that this provider can keep open
-     * to each peer in the cluster.
+     * @return the number of idle sockets that this provider can keep open to each peer in the cluster.
      */
     public int getMaxIdle() {
         return _maxIdle;
     }
 
     /**
-     * The number of worker threads that are used for
-     * transmitting packets to peers in the cluster.
+     * Set the number of worker threads that are used for transmitting packets to peers in the cluster.
+     * 
+     * @param numBroadcastThreads the number of worker threads that are used for transmitting packets to peers in the
+     * cluster
      */
-    public void setNumBroadcastThreads(int numBroadcastThreads) {
+    public void setNumBroadcastThreads(final int numBroadcastThreads) {
         synchronized (_broadcastThreads) {
             int cur = _broadcastThreads.size();
             if (cur > numBroadcastThreads) {
@@ -212,36 +215,33 @@
     }
 
     /**
-     * The number of worker threads that are used for
-     * transmitting packets to peers in the cluster.
+     * @return the number of worker threads that are used for transmitting packets to peers in the cluster.
      */
     public int getNumBroadcastThreads() {
         return _broadcastThreads.size();
     }
 
     /**
-     * Sets the list of addresses of peers to which this provider will
-     * send events to. The peers are semicolon-separated <code>names</code>
-     * list in the form of "myhost1:portA;myhost2:portB".
+     * Sets the list of addresses of peers to which this provider will send events to.
+     * The peers are semicolon-separated <code>names</code> list in the form of "myhost1:portA;myhost2:portB".
+     * 
+     * @param names the list of addresses of peers to which this provider will send events to
+     * @throws UnknownHostException in case peer name cannot be resolved
      */
-    public void setAddresses(String names)
-        throws UnknownHostException {
-        // NYI. Could look for equivalence of addresses and avoid
-        // changing those that didn't change.
+    public void setAddresses(final String names) throws UnknownHostException {
+        // NYI. Could look for equivalence of addresses and avoid changing those that didn't change.
 
         _addressesLock.lock();
         try {
-            for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) {
-                iter.next().close();
-            }
+            _addresses.forEach(HostAddress::close);
+
             String[] toks = StringUtil.split(names, ";", 0);
             _addresses = new ArrayList<>(toks.length);
 
             InetAddress localhost = InetAddress.getLocalHost();
             String localhostName = localhost.getHostName();
 
-            for (int i = 0; i < toks.length; i++) {
-                String host = toks[i];
+            for (String host : toks) {
                 String hostname;
                 int tmpPort;
                 int colon = host.indexOf(':');
@@ -252,9 +252,8 @@
                     hostname = host;
                     tmpPort = DEFAULT_PORT;
                 }
-                InetAddress tmpAddress = AccessController
-                    .doPrivileged(J2DoPrivHelper.getByNameAction(hostname));
-
+                InetAddress tmpAddress = AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(hostname));
+                
                 // bleair: For each address we would rather make use of
                 // the jdk1.4 isLinkLocalAddress () || isLoopbackAddress ().
                 // (Though in practice on win32 they don't work anyways!)
@@ -265,16 +264,14 @@
                     // This string matches the hostname for for ourselves, we
                     // don't actually need to send ourselves messages.
                     if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-address-asself",
-                            tmpAddress.getHostName() + ":" + tmpPort));
+                        log.trace(s_loc.get("tcp-address-asself", tmpAddress.getHostName() + ":" + tmpPort));
                     }
                 } else {
                     HostAddress newAddress = new HostAddress(host);
                     _addresses.add(newAddress);
                     if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-address-set",
-                            newAddress._address.getHostName() + ":"
-                                + newAddress._port));
+                        log.trace(s_loc.get("tcp-address-set", 
+                                newAddress._address.getHostName() + ":" + newAddress._port));
                     }
                 }
             }
@@ -296,25 +293,21 @@
         super.endConfiguration();
         synchronized (s_portListenerMap) {
             // see if a listener exists for this port.
-            _listener = s_portListenerMap.get
-                (String.valueOf(_port));
+            _listener = s_portListenerMap.get(String.valueOf(_port));
 
-            if (_listener == null ||
-                (!_listener.isRunning() && _listener._port == _port)) {
+            if (_listener == null || (!_listener.isRunning() && _listener._port == _port)) {
                 try {
                     _listener = new TCPPortListener(_port, log);
                     _listener.listen();
                     s_portListenerMap.put(String.valueOf(_port), _listener);
                 } catch (Exception e) {
-                    throw new GeneralException(s_loc.get("tcp-init-exception",
-                        String.valueOf(_port)), e).setFatal(true);
+                    throw new GeneralException(s_loc.get("tcp-init-exception", String.valueOf(_port)), e).
+                            setFatal(true);
                 }
             } else if (_listener.isRunning()) {
                 if (_listener._port != _port) {
                     // this really shouldn't be able to happen.
-                    throw new GeneralException(s_loc.get
-                        ("tcp-not-equal", String.valueOf(_port))).
-                        setFatal(true);
+                    throw new GeneralException(s_loc.get("tcp-not-equal", String.valueOf(_port))).setFatal(true);
                 }
             } else {
                 throw new InternalException(s_loc.get("tcp-listener-broken"));
@@ -324,15 +317,11 @@
 
         _addressesLock.lock();
         try {
-            HostAddress curAddress;
-            for (Iterator<HostAddress> iter = _addresses.iterator();
-                iter.hasNext();) {
-                curAddress = iter.next();
+            _addresses.forEach(curAddress -> {
                 curAddress.setMaxTotal(_maxTotal);
                 curAddress.setMaxIdle(_maxIdle);
-            }
-        }
-        finally {
+            });
+        } finally {
             _addressesLock.unlock();
         }
     }
@@ -345,11 +334,10 @@
     private static final long PROTOCOL_VERSION = 0x1428acff;
 
     @Override
-    public void broadcast(RemoteCommitEvent event) {
-        try {
-            // build a packet notifying other JVMs of object changes.
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
+    public void broadcast(final RemoteCommitEvent event) {
+        // build a packet notifying other JVMs of object changes.
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(baos);) {
 
             oos.writeLong(PROTOCOL_VERSION);
             oos.writeLong(_id);
@@ -376,12 +364,10 @@
      * Sends a change notification packet to other machines in this
      * provider cluster.
      */
-    private void sendUpdatePacket(byte[] bytes) {
+    private void sendUpdatePacket(final byte[] bytes) {
         _addressesLock.lock();
         try {
-            for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) {
-                iter.next().sendUpdatePacket(bytes);
-            }
+            _addresses.forEach(address -> address.sendUpdatePacket(bytes));
         } finally {
             _addressesLock.unlock();
         }
@@ -407,9 +393,7 @@
 
         _addressesLock.lock();
         try {
-            for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) {
-                iter.next().close();
-            }
+            _addresses.forEach(HostAddress::close);
         } finally {
             _addressesLock.unlock();
         }
@@ -422,7 +406,7 @@
      */
     private static class BroadcastQueue {
 
-        private LinkedList<byte[]> _packetQueue = new LinkedList<>();
+        private final LinkedList<byte[]> _packetQueue = new LinkedList<>();
         private boolean _closed = false;
 
         public synchronized void close() {
@@ -434,7 +418,7 @@
             return _closed;
         }
 
-        public synchronized void addPacket(byte[] bytes) {
+        public synchronized void addPacket(final byte[] bytes) {
             _packetQueue.addLast(bytes);
             notify();
         }
@@ -443,8 +427,7 @@
          * @return the bytes defining the packet to process, or
          * <code>null</code> if the queue is empty.
          */
-        public synchronized byte[] removePacket()
-            throws InterruptedException {
+        public synchronized byte[] removePacket() throws InterruptedException {
             // only wait if the queue is still open. This allows processing
             // of events in the queue to continue, while avoiding sleeping
             // during shutdown.
@@ -487,7 +470,7 @@
             remove();
         }
 
-        public void setRunning(boolean keepRunning) {
+        public void setRunning(final boolean keepRunning) {
             _keepRunning = keepRunning;
         }
 
@@ -499,8 +482,7 @@
     /**
      * Responsible for listening for incoming packets and processing them.
      */
-    private static class TCPPortListener
-        implements Runnable {
+    private static final class TCPPortListener implements Runnable {
 
         private final Log _log;
         private ServerSocket _receiveSocket;
@@ -511,7 +493,7 @@
         /**
          * Cache the local IP address
          */
-        private byte[] _localhost;
+        private final byte[] _localhost;
 
         /**
          * The port that this listener should listen on. Configured
@@ -527,21 +509,18 @@
         /**
          * Construct a new TCPPortListener configured to use the specified port.
          */
-        private TCPPortListener(int port, Log log)
-            throws IOException {
+        private TCPPortListener(final int port, final Log log) throws IOException {
             _port = port;
             _log = log;
             try {
-                _receiveSocket = AccessController
-                    .doPrivileged(J2DoPrivHelper.newServerSocketAction(_port));
+                _receiveSocket = AccessController.doPrivileged(J2DoPrivHelper.newServerSocketAction(_port));
             } catch (PrivilegedActionException pae) {
                 throw (IOException) pae.getException();
             }
             _localhost = InetAddress.getLocalHost().getAddress();
 
             if (_log.isTraceEnabled()) {
-                _log.info(s_loc.get("tcp-start-listener",
-                    String.valueOf(_port)));
+                _log.info(s_loc.get("tcp-start-listener", String.valueOf(_port)));
             }
         }
 
@@ -552,30 +531,26 @@
         }
 
         /**
-         * All providers added here will be notified of any incoming
-         * provider messages. There will be one of these per
+         * All providers added here will be notified of any incoming provider messages. There will be one of these per
          * BrokerFactory in a given JVM.
-         * {@link TCPRemoteCommitProvider#endConfiguration} invokes
-         * <code>addProvider</code> with <code>this</code> upon
+         * {@link TCPRemoteCommitProvider#endConfiguration} invokes <code>addProvider</code> with <code>this</code> upon
          * completion of configuration.
          */
-        private void addProvider(TCPRemoteCommitProvider provider) {
+        private void addProvider(final TCPRemoteCommitProvider provider) {
             synchronized (_providers) {
                 _providers.add(provider);
             }
         }
 
         /**
-         * Remove a provider from the list of providers to notify of
-         * commit events.
+         * Remove a provider from the list of providers to notify of commit events.
          */
-        private synchronized void removeProvider
-            (TCPRemoteCommitProvider provider) {
+        private synchronized void removeProvider(final TCPRemoteCommitProvider provider) {
             synchronized (_providers) {
                 _providers.remove(provider);
 
                 // if the provider list is empty, shut down the thread.
-                if (_providers.size() == 0) {
+                if (_providers.isEmpty()) {
                     _isRunning = false;
                     try {
                         _receiveSocket.close();
@@ -606,12 +581,10 @@
                 try {
                     s = null;
                     // Block, waiting to accept new connection from a peer
-                    s = AccessController.doPrivileged(J2DoPrivHelper
-                        .acceptAction(_receiveSocket));
+                    s = AccessController.doPrivileged(J2DoPrivHelper.acceptAction(_receiveSocket));
                     if (_log.isTraceEnabled()) {
                         _log.trace(s_loc.get("tcp-received-connection",
-                            s.getInetAddress().getHostAddress()
-                                + ":" + s.getPort()));
+                            s.getInetAddress().getHostAddress() + ":" + s.getPort()));
                     }
                     ReceiveSocketHandler sh = new ReceiveSocketHandler(s);
                     Thread receiverThread = new Thread(sh);
@@ -643,17 +616,8 @@
             }
 
             // We are done listening. Interrupt any worker threads.
-            Thread worker;
-            for (Iterator<Thread> iter = _receiverThreads.iterator();
-                iter.hasNext();) {
-                worker = iter.next();
-                // FYI, the worker threads are blocked
-                // reading from the socket's InputStream. InputStreams
-                // aren't interruptable, so this interrupt isn't
-                // really going to be delivered until something breaks
-                // the InputStream.
-                worker.interrupt();
-            }
+            _receiverThreads.forEach(Thread::interrupt);
+
             synchronized (_providers) {
                 try {
                     if (_isRunning) {
@@ -666,8 +630,7 @@
                 }
                 _isRunning = false;
                 if (_log.isTraceEnabled()) {
-                    _log.trace(s_loc.get("tcp-close-listener",
-                        _port + ""));
+                    _log.trace(s_loc.get("tcp-close-listener", _port + ""));
                 }
             }
         }
@@ -676,13 +639,12 @@
          * Utility class that acts as a worker thread to receive Events
          * from broadcasters.
          */
-        private class ReceiveSocketHandler
-            implements Runnable {
+        private final class ReceiveSocketHandler implements Runnable {
 
             private InputStream _in;
             private Socket _s;
 
-            private ReceiveSocketHandler(Socket s) {
+            private ReceiveSocketHandler(final Socket s) {
                 // We are the receiving end and we don't send any messages
                 // back to the broadcaster. Turn off Nagle's so that
                 // we will send ack packets without waiting.
@@ -718,8 +680,7 @@
                         // closing its end.
                         if (_log.isTraceEnabled()) {
                             _log.trace(s_loc.get("tcp-close-socket",
-                                _s.getInetAddress().getHostAddress()
-                                    + ":" + _s.getPort()));
+                                    _s.getInetAddress().getHostAddress() + ":" + _s.getPort()));
                         }
                         break;
                     } catch (Throwable e) {
@@ -738,8 +699,7 @@
                     }
                 } catch (IOException e) {
                     _log.warn(s_loc.get("tcp-close-socket-error",
-                        _s.getInetAddress().getHostAddress() + ":"
-                            + _s.getPort()), e);
+                        _s.getInetAddress().getHostAddress() + ":" + _s.getPort()), e);
                 }
             }
 
@@ -747,18 +707,15 @@
              * Process an {@link InputStream} containing objects written
              * by {@link TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}.
              */
-            private void handle(InputStream in)
-                throws IOException, ClassNotFoundException {
+            private void handle(final InputStream in) throws IOException, ClassNotFoundException {
                 // This will block waiting for the next
-                ObjectInputStream ois =
-                    new Serialization.ClassResolvingObjectInputStream(in);
+                ObjectInputStream ois = new Serialization.ClassResolvingObjectInputStream(in);
 
                 long protocolVersion = ois.readLong();
                 if (protocolVersion != PROTOCOL_VERSION) {
                     if (_log.isWarnEnabled()) {
                         _log.warn(s_loc.get("tcp-wrong-version-error",
-                            _s.getInetAddress().getHostAddress() + ":"
-                                + _s.getPort()));
+                            _s.getInetAddress().getHostAddress() + ":" + _s.getPort()));
                         return;
                     }
                 }
@@ -773,19 +730,12 @@
                             + _s.getPort()));
                 }
 
-                boolean fromSelf = senderPort == _port &&
-                    Arrays.equals(senderAddress, _localhost);
-                TCPRemoteCommitProvider provider;
+                boolean fromSelf = senderPort == _port && Arrays.equals(senderAddress, _localhost);
                 synchronized (_providers) {
                     // bleair: We're iterating, but currenlty there can really
                     // only be a single provider.
-                    for (Iterator<TCPRemoteCommitProvider> iter = _providers.iterator();
-                        iter.hasNext();) {
-                        provider = iter.next();
-                        if (senderId != provider._id || !fromSelf) {
-                            provider.eventManager.fireEvent(rce);
-                        }
-                    }
+                    _providers.stream().filter(provider -> senderId != provider._id || !fromSelf).
+                            forEach(provider -> provider.eventManager.fireEvent(rce));
                 }
             }
         }
@@ -796,32 +746,29 @@
      * InetSocketAddress because it's a JDK1.4 API. This also
      * provides a wrapper around the socket(s) associated with this address.
      */
-    private class HostAddress {
+    protected class HostAddress {
 
-        private InetAddress _address;
-        private int _port;
-        private long _timeLastError; // millis
-        private boolean _isAvailable; // is peer thought to be up
-        private int _infosIssued = 0; // limit log entries
+        protected InetAddress _address;
+        protected int _port;
+        protected long _timeLastError; // millis
+        protected boolean _isAvailable; // is peer thought to be up
+        protected int _infosIssued = 0; // limit log entries
 
-        private GenericObjectPool<Socket> _socketPool; // reusable open sockets
+        protected final GenericObjectPool<Socket> _socketPool; // reusable open sockets
 
         /**
-         * Construct a new host address from a string of the form
-         * "host:port" or of the form "host".
+         * Construct a new host address from a string of the form "host:port" or of the form "host".
+         * @param host host name
          */
-        private HostAddress(String host)
-            throws UnknownHostException {
+        public HostAddress(final String host) throws UnknownHostException {
             int colon = host.indexOf(':');
             try {
                 if (colon != -1) {
                     _address = AccessController
-                        .doPrivileged(J2DoPrivHelper.getByNameAction(host
-                            .substring(0, colon)));
+                        .doPrivileged(J2DoPrivHelper.getByNameAction(host.substring(0, colon)));
                     _port = Integer.parseInt(host.substring(colon + 1));
                 } else {
-                    _address = AccessController
-                        .doPrivileged(J2DoPrivHelper.getByNameAction(host));
+                    _address = AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(host));
                     _port = DEFAULT_PORT;
                 }
             } catch (PrivilegedActionException pae) {
@@ -836,14 +783,22 @@
             _isAvailable = true;
         }
 
-        private void setMaxTotal(int maxTotal) {
+        protected void setMaxTotal(final int maxTotal) {
             _socketPool.setMaxTotal(maxTotal);
         }
 
-        private void setMaxIdle(int maxIdle) {
+        protected void setMaxIdle(final int maxIdle) {
             _socketPool.setMaxIdle(maxIdle);
         }
 
+        public InetAddress getAddress() {
+            return _address;
+        }
+
+        public int getPort() {
+            return _port;
+        }
+
         public void close() {
             // Close the pool of sockets to this peer. This
             // will close all sockets in the pool.
@@ -856,7 +811,7 @@
             }
         }
 
-        private void sendUpdatePacket(byte[] bytes) {
+        protected void sendUpdatePacket(byte[] bytes) {
             if (!_isAvailable) {
                 long now = System.currentTimeMillis();
                 if (now - _timeLastError < _recoveryTimeMillis) {
@@ -872,9 +827,8 @@
                 os.flush();
 
                 if (log.isTraceEnabled()) {
-                    log.trace(s_loc.get("tcp-sent-update",
-                        _address.getHostAddress() + ":" + _port,
-                        String.valueOf(s.getLocalPort())));
+                    log.trace(s_loc.get("tcp-sent-update", 
+                            _address.getHostAddress() + ":" + _port, String.valueOf(s.getLocalPort())));
                 }
                 _isAvailable = true;
                 _infosIssued = 0;
@@ -893,8 +847,7 @@
                 if (_isAvailable) {
                     // Log a warning, the peer was up and has now gone down
                     if (log.isWarnEnabled()) {
-                        log.warn(s_loc.get("tcp-send-error",
-                            _address.getHostAddress() + ":" + _port), e);
+                        log.warn(s_loc.get("tcp-send-error", _address.getHostAddress() + ":" + _port), e);
                     }
                     _isAvailable = false;
                     // Once enough time has passed we will log another warning
@@ -910,9 +863,7 @@
                             // lower severity. This log will occur periodically
                             // for 5 times until the peer comes back.
                             if (log.isInfoEnabled()) {
-                                log.info(s_loc.get("tcp-send-still-error",
-                                    _address.getHostAddress() + ":"
-                                        + _port), e);
+                                log.info(s_loc.get("tcp-send-still-error", _address.getHostAddress() + ":" + _port), e);
                             }
                             _infosIssued++;
                         }
@@ -921,21 +872,19 @@
             }
         }
 
-        private Socket getSocket()
-            throws Exception {
+        protected Socket getSocket() throws Exception {
             return _socketPool.borrowObject();
         }
 
-        private void returnSocket(Socket s)
-            throws Exception {
+        protected void returnSocket(final Socket s) throws Exception {
             _socketPool.returnObject(s);
         }
 
-        private void clearAllSockets() {
+        protected void clearAllSockets() {
             _socketPool.clear();
         }
 
-        private void closeSocket(Socket s) {
+        protected void closeSocket(final Socket s) {
             // All sockets come from the pool.
             // This socket is no longer usable, so delete it from the
             // pool.
@@ -948,16 +897,13 @@
         /**
          * Factory for pooled sockets.
          */
-        private class SocketPoolableObjectFactory extends BasePooledObjectFactory<Socket> {
+        protected class SocketPoolableObjectFactory extends BasePooledObjectFactory<Socket> {
             @Override
             public Socket create() throws Exception {
                 try {
-                    Socket s = AccessController
-                        .doPrivileged(J2DoPrivHelper.newSocketAction(_address,
-                            _port));
+                    Socket s = AccessController.doPrivileged(J2DoPrivHelper.newSocketAction(_address, _port));
                     if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-open-connection", _address
-                            + ":" + _port, "" + s.getLocalPort()));
+                        log.trace(s_loc.get("tcp-open-connection", _address + ":" + _port, "" + s.getLocalPort()));
                     }
                     return s;
                 } catch (PrivilegedActionException pae) {
@@ -966,24 +912,46 @@
             }
 
             @Override
-            public PooledObject<Socket> wrap(Socket obj) {
+            public PooledObject<Socket> wrap(final Socket obj) {
                 return new DefaultPooledObject<>(obj);
             }
 
             @Override
-            public void destroyObject(PooledObject<Socket> p) throws Exception {
-                try {
-                    Socket s = p.getObject();
+            public void destroyObject(final PooledObject<Socket> p) throws Exception {
+                try (Socket s = p.getObject()) {
                     if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-close-sending-socket",
-                            _address + ":" + _port, "" + s.getLocalPort()));
+                        log.trace(s_loc.get("tcp-close-sending-socket", _address + ":" + _port, "" + s.getLocalPort()));
                     }
-                    s.close();
                 } catch (Exception e) {
-                    log.warn(s_loc.get("tcp-close-socket-error",
-                        _address.getHostAddress() + ":" + _port), e);
+                    log.warn(s_loc.get("tcp-close-socket-error", _address.getHostAddress() + ":" + _port), e);
                 }
             }
         }
+
+        @Override
+        public int hashCode() {
+            int hash = 7;
+            hash = 37 * hash + Objects.hashCode(this._address);
+            hash = 37 * hash + this._port;
+            return hash;
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            final HostAddress other = (HostAddress) obj;
+            if (this._port != other._port) {
+                return false;
+            }
+            return Objects.equals(this._address, other._address);
+        }
     }
 }
diff --git a/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties b/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties
index fc40679..6549c4a 100644
--- a/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties
+++ b/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties
@@ -81,6 +81,9 @@
 	port "{0}".
 tcp-address-asself: Identified address of "{0}", which is equal to ourself.
 tcp-address-set: Configured to send to peer "{0}"
+dyntcp-address-not-set: Already configured to send to peer "{0}"
+dyntcp-address-unset: Removed peer "{0}"
+dyntcp-updater-error: Error while updating hosts
 tcp-received-event: Received event from peer "{0}"
 tcp-open-connection: Creating new socket connection to "{0}", using local port \
 	"{1}".
diff --git a/openjpa-kubernetes/pom.xml b/openjpa-kubernetes/pom.xml
new file mode 100644
index 0000000..dc8ed27
--- /dev/null
+++ b/openjpa-kubernetes/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!--
+    Please keep the project tag on one line to avoid confusing
+    the release plugin.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.openjpa</groupId>
+    <artifactId>openjpa-parent</artifactId>
+    <version>3.1.1-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>openjpa-kubernetes</artifactId>
+  <packaging>jar</packaging>
+  <name>OpenJPA Kubernetes</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.openjpa</groupId>
+      <artifactId>openjpa-kernel</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-server-mock</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/openjpa-kubernetes/src/main/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProvider.java b/openjpa-kubernetes/src/main/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProvider.java
new file mode 100644
index 0000000..7b7d945
--- /dev/null
+++ b/openjpa-kubernetes/src/main/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.event.kubernetes;
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.openjpa.event.DynamicTCPRemoteCommitProvider;
+import org.apache.openjpa.lib.util.Localizer;
+
+public class KubernetesTCPRemoteCommitProvider extends DynamicTCPRemoteCommitProvider {
+
+    private static final Localizer s_loc = Localizer.forPackage(KubernetesTCPRemoteCommitProvider.class);
+
+    private String _namespace = "<namespace>";
+
+    private String _label = "<label>";
+
+    public KubernetesTCPRemoteCommitProvider() throws UnknownHostException {
+        super();
+    }
+
+    public String getNamespace() {
+        return _namespace;
+    }
+
+    public void setNamespace(final String namespace) {
+        this._namespace = namespace;
+    }
+
+    public String getLabel() {
+        return _label;
+    }
+
+    public void setLabel(final String label) {
+        this._label = label;
+    }
+
+    protected KubernetesClient kubernetesClient() throws KubernetesClientException {
+        return new DefaultKubernetesClient();
+    }
+
+    @Override
+    protected List<String> fetchDynamicAddresses() {
+        List<String> podIPs = new ArrayList<>();
+
+        try (KubernetesClient client = kubernetesClient()) {
+            podIPs.addAll(client.pods().inNamespace(_namespace).withLabel(_label).list().
+                    getItems().stream().
+                    map(pod -> pod.getStatus().getPodIP()).
+                    collect(Collectors.toList()));
+
+            if (log.isTraceEnabled()) {
+                log.trace(s_loc.get("kubernetestcp-pods", podIPs));
+            }
+        } catch (KubernetesClientException e) {
+            if (log.isFatalEnabled()) {
+                log.fatal(s_loc.get("kubernetestcp-error"), e);
+            }
+        }
+
+        return podIPs;
+    }
+}
diff --git a/openjpa-kubernetes/src/main/resources/org/apache/openjpa/event/kubernetes/localizer.properties b/openjpa-kubernetes/src/main/resources/org/apache/openjpa/event/kubernetes/localizer.properties
new file mode 100644
index 0000000..47fac21
--- /dev/null
+++ b/openjpa-kubernetes/src/main/resources/org/apache/openjpa/event/kubernetes/localizer.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+kubernetestcp-error: Error while setting up Kubernetes client
+kubernetestcp-pods: Pods found
diff --git a/openjpa-kubernetes/src/test/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProviderTest.java b/openjpa-kubernetes/src/test/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProviderTest.java
new file mode 100644
index 0000000..d8e1e1f
--- /dev/null
+++ b/openjpa-kubernetes/src/test/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProviderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.event.kubernetes;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import java.lang.reflect.Field;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.openjpa.event.TCPRemoteCommitProvider;
+import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.log.SLF4JLogFactory;
+import org.jmock.Expectations;
+import org.jmock.integration.junit4.JUnitRuleMockery;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KubernetesTCPRemoteCommitProviderTest {
+
+    private static final String NAMESPACE = "ns1";
+
+    private static final String LABEL = "testKey";
+
+    @Rule
+    public JUnitRuleMockery context = new JUnitRuleMockery();
+
+    @Rule
+    public KubernetesServer server = new KubernetesServer(true, true);
+
+    private Pod pod1;
+
+    private Pod pod2;
+
+    private Pod pod3;
+
+    private Pod pod4;
+
+    @Before
+    public void setupKubernetes() {
+        KubernetesClient client = server.getClient();
+
+        pod1 = new PodBuilder().
+                withNewMetadata().
+                withName("pod1").
+                addToLabels(LABEL, "value1").
+                endMetadata().
+                withStatus(new PodStatusBuilder().withPodIP("1.1.1.1").build()).
+                build();
+        client.pods().inNamespace(NAMESPACE).create(pod1);
+
+        pod2 = new PodBuilder().
+                withNewMetadata().
+                withName("pod2").
+                addToLabels(LABEL, "value2").
+                endMetadata().
+                withStatus(new PodStatusBuilder().withPodIP("2.2.2.2").build()).
+                build();
+        client.pods().inNamespace(NAMESPACE).create(pod2);
+
+        pod3 = new PodBuilder().
+                withNewMetadata().
+                withName("pod3").
+                endMetadata().
+                withStatus(new PodStatusBuilder().withPodIP("3.3.3.3").build()).
+                build();
+        client.pods().inNamespace("ns2").create(pod3);
+
+        pod4 = new PodBuilder().
+                withNewMetadata().
+                withName("pod4").
+                addToLabels("other", "value1").
+                endMetadata().
+                withStatus(new PodStatusBuilder().withPodIP("4.4.4.4").build()).
+                build();
+        client.pods().inNamespace(NAMESPACE).create(pod4);
+
+        PodList podList = client.pods().inNamespace(NAMESPACE).withLabel(LABEL).list();
+        assertNotNull(podList);
+        assertEquals(2, podList.getItems().size());
+        assertTrue(podList.getItems().contains(pod1));
+        assertTrue(podList.getItems().contains(pod2));
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<String> getAddresses(final KubernetesTCPRemoteCommitProvider rcp)
+            throws NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
+
+        Field _addresses = TCPRemoteCommitProvider.class.getDeclaredField("_addresses");
+        _addresses.setAccessible(true);
+
+        return new ArrayList<>((List<Object>) _addresses.get(rcp)).stream().
+                map(ReflectionToStringBuilder::toString).
+                map(address -> StringUtils.substringAfter(address, "_address=/")).
+                map(address -> StringUtils.substringBefore(address, ",")).
+                collect(Collectors.toList());
+    }
+
+    @Test
+    public void addresses() throws UnknownHostException, NoSuchFieldException,
+            IllegalArgumentException, IllegalAccessException, InterruptedException {
+
+        // prepare KubernetesTCPRemoteCommitProvider instance, inject mocked Kubernetes client
+        KubernetesTCPRemoteCommitProvider rcp = new KubernetesTCPRemoteCommitProvider() {
+
+            @Override
+            protected KubernetesClient kubernetesClient() throws KubernetesClientException {
+                return server.getClient();
+            }
+        };
+        rcp.setNamespace(NAMESPACE);
+        rcp.setLabel(LABEL);
+        rcp.setCacheDurationMillis(500);
+
+        // mock OpenJPA configuration
+        Configuration conf = context.mock(Configuration.class);
+        context.checking(new Expectations() {
+
+            {
+                oneOf(conf).getLog(with(any(String.class)));
+                will(returnValue(new SLF4JLogFactory().getLog("")));
+            }
+        });
+        rcp.setConfiguration(conf);
+
+        // finalize
+        rcp.endConfiguration();
+
+        // expect to find remote addresses of matching pods
+        List<String> addresses = getAddresses(rcp);
+        assertEquals(2, addresses.size());
+        assertTrue(addresses.contains(pod1.getStatus().getPodIP()));
+        assertTrue(addresses.contains(pod2.getStatus().getPodIP()));
+
+        Thread.sleep(500);
+
+        addresses = getAddresses(rcp);
+        assertEquals(2, addresses.size());
+        assertTrue(addresses.contains(pod1.getStatus().getPodIP()));
+        assertTrue(addresses.contains(pod2.getStatus().getPodIP()));
+    }
+}
diff --git a/openjpa/pom.xml b/openjpa/pom.xml
index c3ed6d5..7a93f96 100644
--- a/openjpa/pom.xml
+++ b/openjpa/pom.xml
@@ -83,6 +83,7 @@
                                     <include>org.apache.openjpa:openjpa-persistence-jdbc</include>
                                     <include>org.apache.openjpa:openjpa-xmlstore</include>
                                     <include>org.apache.openjpa:openjpa-slice</include>
+                                    <include>org.apache.openjpa:openjpa-kubernetes</include>
                                 </includes>
                             </artifactSet>
                             <!-- OpenJPA unique META-INF setup -->
@@ -234,5 +235,10 @@
             <artifactId>openjpa-slice</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-kubernetes</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/pom.xml b/pom.xml
index bc55e4d..8b10a13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
         <mssql.connector.version>7.2.1</mssql.connector.version>
 
         <!-- other common versions -->
+        <kubernetes-client.version>4.7.0</kubernetes-client.version>
         <slf4j.version>1.7.23</slf4j.version>
         <!-- Compile Java source/target class level -->
         <compile.class.source>${java.class.version}</compile.class.source>
@@ -174,6 +175,7 @@
         <module>openjpa-xmlstore</module>
         <module>openjpa-slice</module>
         <module>openjpa-jest</module>
+        <module>openjpa-kubernetes</module>
         <module>openjpa</module>
         <module>openjpa-project</module>
         <module>openjpa-examples</module>
@@ -1790,6 +1792,16 @@
                 <version>2.11.1</version>
             </dependency>
             <dependency>
+              <groupId>io.fabric8</groupId>
+              <artifactId>kubernetes-client</artifactId>
+              <version>${kubernetes-client.version}</version>
+            </dependency>
+            <dependency>
+              <groupId>io.fabric8</groupId>
+              <artifactId>kubernetes-server-mock</artifactId>
+              <version>${kubernetes-client.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>${slf4j.version}</version>