[OPENJPA-2801] Introducing KubernetesTCPRemoteCommitProvider
diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/event/DynamicTCPRemoteCommitProvider.java b/openjpa-kernel/src/main/java/org/apache/openjpa/event/DynamicTCPRemoteCommitProvider.java
new file mode 100644
index 0000000..bb482d0
--- /dev/null
+++ b/openjpa-kernel/src/main/java/org/apache/openjpa/event/DynamicTCPRemoteCommitProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.event;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+import org.apache.openjpa.lib.util.J2DoPrivHelper;
+
+public abstract class DynamicTCPRemoteCommitProvider extends TCPRemoteCommitProvider {
+
+ private int _cacheDurationMillis = 30000;
+
+ public DynamicTCPRemoteCommitProvider() throws UnknownHostException {
+ super();
+ }
+
+ public int getCacheDurationMillis() {
+ return _cacheDurationMillis;
+ }
+
+ public void setCacheDurationMillis(final int _cacheDurationMillis) {
+ this._cacheDurationMillis = _cacheDurationMillis;
+ }
+
+ @Override
+ public final void setAddresses(final String names) throws UnknownHostException {
+ throw new UnknownHostException("Do not set Addresses on this instance; "
+ + "did you expect " + TCPRemoteCommitProvider.class.getSimpleName() + " ?");
+ }
+
+ @Override
+ public void endConfiguration() {
+ TcpAddressesUpdater updater = new TcpAddressesUpdater();
+ updater.run();
+
+ Timer timer = new Timer(true);
+ timer.scheduleAtFixedRate(updater, 0, _cacheDurationMillis);
+
+ super.endConfiguration();
+ }
+
+ protected abstract List<String> fetchDynamicAddresses();
+
+ private class TcpAddressesUpdater extends TimerTask {
+
+ @Override
+ public void run() {
+ List<String> dynamicAddresses = fetchDynamicAddresses();
+
+ _addressesLock.lock();
+ try {
+ String localhostAddress = InetAddress.getLocalHost().getHostAddress();
+
+ for (String dynamic : dynamicAddresses) {
+ InetAddress tmpAddress = AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(dynamic));
+
+ if (localhostAddress.equals(dynamic)) {
+ // This string matches the hostname for for ourselves, we
+ // don't actually need to send ourselves messages.
+ if (log.isTraceEnabled()) {
+ log.trace(s_loc.get("tcp-address-asself", tmpAddress.getHostAddress() + ":" + _port));
+ }
+ } else {
+ HostAddress podAddress = new HostAddress(dynamic);
+ if (_addresses.contains(podAddress)) {
+ if (log.isTraceEnabled()) {
+ log.trace(s_loc.get("dyntcp-address-not-set",
+ podAddress.getAddress().getHostAddress() + ":" + podAddress.getPort()));
+ }
+ } else {
+ _addresses.add(podAddress);
+
+ if (log.isTraceEnabled()) {
+ log.trace(s_loc.get("dyntcp-address-set",
+ podAddress.getAddress().getHostAddress() + ":" + podAddress.getPort()));
+ }
+ }
+ }
+ }
+
+ List<HostAddress> toCloseAndRemove = _addresses.stream().
+ filter(address -> !dynamicAddresses.contains(address.getAddress().getHostAddress())).
+ collect(Collectors.toList());
+ toCloseAndRemove.forEach(address -> {
+ address.close();
+ _addresses.remove(address);
+
+ if (log.isTraceEnabled()) {
+ log.trace(s_loc.get("tcp-address-unset",
+ address.getAddress().getHostAddress() + ":" + address.getPort()));
+ }
+ });
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error(s_loc.get("dyntcp-updater-error"), e);
+ }
+ } finally {
+ _addressesLock.unlock();
+ }
+ }
+ }
+}
diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java b/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
index b7f8b9f..ab4793c 100644
--- a/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
+++ b/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
@@ -38,10 +38,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
@@ -75,8 +75,7 @@
private static final int DEFAULT_PORT = 5636;
- private static final Localizer s_loc = Localizer.forPackage
- (TCPRemoteCommitProvider.class);
+ protected static final Localizer s_loc = Localizer.forPackage(TCPRemoteCommitProvider.class);
private static long s_idSequence = System.currentTimeMillis();
// A map of listen ports to listeners in this JVM. We might
@@ -84,22 +83,20 @@
// that is not currently possible in a single JVM.
private static final Map<String, TCPPortListener> s_portListenerMap = new HashMap<>();
- private long _id;
- private byte[] _localhost;
- private int _port = DEFAULT_PORT;
+ private final long _id;
+ private final byte[] _localhost;
+ protected int _port = DEFAULT_PORT;
private int _maxTotal = 2;
private int _maxIdle = 2;
private int _recoveryTimeMillis = 15000;
private TCPPortListener _listener;
- private BroadcastQueue _broadcastQueue = new BroadcastQueue();
- private final List<BroadcastWorkerThread> _broadcastThreads = Collections.synchronizedList(
- new LinkedList<>());
+ private final BroadcastQueue _broadcastQueue = new BroadcastQueue();
+ private final List<BroadcastWorkerThread> _broadcastThreads = Collections.synchronizedList(new LinkedList<>());
- private List<HostAddress> _addresses = new ArrayList<>();
- private ReentrantLock _addressesLock;
+ protected List<HostAddress> _addresses = new ArrayList<>();
+ protected final ReentrantLock _addressesLock;
- public TCPRemoteCommitProvider()
- throws UnknownHostException {
+ public TCPRemoteCommitProvider() throws UnknownHostException {
// obtain a unique ID.
synchronized (TCPRemoteCommitProvider.class) {
_id = s_idSequence++;
@@ -112,84 +109,90 @@
}
/**
- * The port that this provider should listen on.
+ * @return the port that this provider should listen on.
*/
public int getPort() {
return _port;
}
/**
- * The port that this provider should listen on. Set once only.
+ * Set the port that this provider should listen on. Set once only.
+ *
+ * @param port the port that this provider should listen on
*/
- public void setPort(int port) {
+ public void setPort(final int port) {
_port = port;
}
/**
- * The number of milliseconds to wait before retrying
- * to reconnect to a peer after it becomes unreachable.
+ * Set the number of milliseconds to wait before retrying to reconnect to a peer after it becomes unreachable.
+ *
+ * @param recoverytime the number of milliseconds to wait before retrying to reconnect to a peer after it becomes
+ * unreachable
*/
- public void setRecoveryTimeMillis(int recoverytime) {
+ public void setRecoveryTimeMillis(final int recoverytime) {
_recoveryTimeMillis = recoverytime;
}
/**
- * The number of milliseconds to wait before retrying
- * to reconnect to a peer after it becomes unreachable.
+ * @return the number of milliseconds to wait before retrying to reconnect to a peer after it becomes unreachable.
*/
public int getRecoveryTimeMillis() {
return _recoveryTimeMillis;
}
/**
- * The maximum number of sockets that this provider can
- * simetaneously open to each peer in the cluster.
+ * Set the maximum number of sockets that this provider can simultaneously open to each peer in the cluster.
*
- * @deprecated please use {@link TCPRemoteCommitProvider#setMaxTotal(int)} instead
+ * @param maxActive the maximum total number of sockets that this provider can simultaneously open to each peer in
+ * the cluster. * @deprecated please use {@link TCPRemoteCommitProvider#setMaxTotal(int)} instead
*/
@Deprecated
- public void setMaxActive(int maxActive) {
+ public void setMaxActive(final int maxActive) {
log.warn("This method should not be used");
_maxTotal = maxActive;
}
/**
- * The maximum total number of sockets that this provider can
- * simetaneously open to each peer in the cluster.
+ * Set the maximum total number of sockets that this provider can simultaneously open to each peer in the cluster.
+ *
+ * @param maxTotal the maximum total number of sockets that this provider can simultaneously open to each peer in
+ * the cluster.
*/
- public void setMaxTotal(int maxTotal) {
+ public void setMaxTotal(final int maxTotal) {
_maxTotal = maxTotal;
}
/**
- * The maximum number of sockets that this provider can
- * simetaneously open to each peer in the cluster.
+ * @return the maximum number of sockets that this provider can simultaneously open to each peer in the cluster.
*/
public int getMaxTotal() {
return _maxTotal;
}
/**
- * The number of idle sockets that this provider can keep open
- * to each peer in the cluster.
+ * Set the number of idle sockets that this provider can keep open to each peer in the cluster.
+ *
+ * @param maxIdle the number of idle sockets that this provider can keep open to each peer in the cluster
*/
- public void setMaxIdle(int maxIdle) {
+ public void setMaxIdle(final int maxIdle) {
_maxIdle = maxIdle;
}
/**
- * The number of idle sockets that this provider can keep open
- * to each peer in the cluster.
+ * @return the number of idle sockets that this provider can keep open to each peer in the cluster.
*/
public int getMaxIdle() {
return _maxIdle;
}
/**
- * The number of worker threads that are used for
- * transmitting packets to peers in the cluster.
+ * Set the number of worker threads that are used for transmitting packets to peers in the cluster.
+ *
+ * @param numBroadcastThreads the number of worker threads that are used for transmitting packets to peers in the
+ * cluster
*/
- public void setNumBroadcastThreads(int numBroadcastThreads) {
+ public void setNumBroadcastThreads(final int numBroadcastThreads) {
synchronized (_broadcastThreads) {
int cur = _broadcastThreads.size();
if (cur > numBroadcastThreads) {
@@ -212,36 +215,33 @@
}
/**
- * The number of worker threads that are used for
- * transmitting packets to peers in the cluster.
+ * @return the number of worker threads that are used for transmitting packets to peers in the cluster.
*/
public int getNumBroadcastThreads() {
return _broadcastThreads.size();
}
/**
- * Sets the list of addresses of peers to which this provider will
- * send events to. The peers are semicolon-separated <code>names</code>
- * list in the form of "myhost1:portA;myhost2:portB".
+ * Sets the list of addresses of peers to which this provider will send events to.
+ * The peers are semicolon-separated <code>names</code> list in the form of "myhost1:portA;myhost2:portB".
+ *
+ * @param names the list of addresses of peers to which this provider will send events to
+ * @throws UnknownHostException in case peer name cannot be resolved
*/
- public void setAddresses(String names)
- throws UnknownHostException {
- // NYI. Could look for equivalence of addresses and avoid
- // changing those that didn't change.
+ public void setAddresses(final String names) throws UnknownHostException {
+ // NYI. Could look for equivalence of addresses and avoid changing those that didn't change.
_addressesLock.lock();
try {
- for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) {
- iter.next().close();
- }
+ _addresses.forEach(HostAddress::close);
+
String[] toks = StringUtil.split(names, ";", 0);
_addresses = new ArrayList<>(toks.length);
InetAddress localhost = InetAddress.getLocalHost();
String localhostName = localhost.getHostName();
- for (int i = 0; i < toks.length; i++) {
- String host = toks[i];
+ for (String host : toks) {
String hostname;
int tmpPort;
int colon = host.indexOf(':');
@@ -252,9 +252,8 @@
hostname = host;
tmpPort = DEFAULT_PORT;
}
- InetAddress tmpAddress = AccessController
- .doPrivileged(J2DoPrivHelper.getByNameAction(hostname));
-
+ InetAddress tmpAddress = AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(hostname));
+
// bleair: For each address we would rather make use of
// the jdk1.4 isLinkLocalAddress () || isLoopbackAddress ().
// (Though in practice on win32 they don't work anyways!)
@@ -265,16 +264,14 @@
// This string matches the hostname for for ourselves, we
// don't actually need to send ourselves messages.
if (log.isTraceEnabled()) {
- log.trace(s_loc.get("tcp-address-asself",
- tmpAddress.getHostName() + ":" + tmpPort));
+ log.trace(s_loc.get("tcp-address-asself", tmpAddress.getHostName() + ":" + tmpPort));
}
} else {
HostAddress newAddress = new HostAddress(host);
_addresses.add(newAddress);
if (log.isTraceEnabled()) {
- log.trace(s_loc.get("tcp-address-set",
- newAddress._address.getHostName() + ":"
- + newAddress._port));
+ log.trace(s_loc.get("tcp-address-set",
+ newAddress._address.getHostName() + ":" + newAddress._port));
}
}
}
@@ -296,25 +293,21 @@
super.endConfiguration();
synchronized (s_portListenerMap) {
// see if a listener exists for this port.
- _listener = s_portListenerMap.get
- (String.valueOf(_port));
+ _listener = s_portListenerMap.get(String.valueOf(_port));
- if (_listener == null ||
- (!_listener.isRunning() && _listener._port == _port)) {
+ if (_listener == null || (!_listener.isRunning() && _listener._port == _port)) {
try {
_listener = new TCPPortListener(_port, log);
_listener.listen();
s_portListenerMap.put(String.valueOf(_port), _listener);
} catch (Exception e) {
- throw new GeneralException(s_loc.get("tcp-init-exception",
- String.valueOf(_port)), e).setFatal(true);
+ throw new GeneralException(s_loc.get("tcp-init-exception", String.valueOf(_port)), e).
+ setFatal(true);
}
} else if (_listener.isRunning()) {
if (_listener._port != _port) {
// this really shouldn't be able to happen.
- throw new GeneralException(s_loc.get
- ("tcp-not-equal", String.valueOf(_port))).
- setFatal(true);
+ throw new GeneralException(s_loc.get("tcp-not-equal", String.valueOf(_port))).setFatal(true);
}
} else {
throw new InternalException(s_loc.get("tcp-listener-broken"));
@@ -324,15 +317,11 @@
_addressesLock.lock();
try {
- HostAddress curAddress;
- for (Iterator<HostAddress> iter = _addresses.iterator();
- iter.hasNext();) {
- curAddress = iter.next();
+ _addresses.forEach(curAddress -> {
curAddress.setMaxTotal(_maxTotal);
curAddress.setMaxIdle(_maxIdle);
- }
- }
- finally {
+ });
+ } finally {
_addressesLock.unlock();
}
}
@@ -345,11 +334,10 @@
private static final long PROTOCOL_VERSION = 0x1428acff;
@Override
- public void broadcast(RemoteCommitEvent event) {
- try {
- // build a packet notifying other JVMs of object changes.
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
+ public void broadcast(final RemoteCommitEvent event) {
+ // build a packet notifying other JVMs of object changes.
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);) {
oos.writeLong(PROTOCOL_VERSION);
oos.writeLong(_id);
@@ -376,12 +364,10 @@
* Sends a change notification packet to other machines in this
* provider cluster.
*/
- private void sendUpdatePacket(byte[] bytes) {
+ private void sendUpdatePacket(final byte[] bytes) {
_addressesLock.lock();
try {
- for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) {
- iter.next().sendUpdatePacket(bytes);
- }
+ _addresses.forEach(address -> address.sendUpdatePacket(bytes));
} finally {
_addressesLock.unlock();
}
@@ -407,9 +393,7 @@
_addressesLock.lock();
try {
- for (Iterator<HostAddress> iter = _addresses.iterator(); iter.hasNext();) {
- iter.next().close();
- }
+ _addresses.forEach(HostAddress::close);
} finally {
_addressesLock.unlock();
}
@@ -422,7 +406,7 @@
*/
private static class BroadcastQueue {
- private LinkedList<byte[]> _packetQueue = new LinkedList<>();
+ private final LinkedList<byte[]> _packetQueue = new LinkedList<>();
private boolean _closed = false;
public synchronized void close() {
@@ -434,7 +418,7 @@
return _closed;
}
- public synchronized void addPacket(byte[] bytes) {
+ public synchronized void addPacket(final byte[] bytes) {
_packetQueue.addLast(bytes);
notify();
}
@@ -443,8 +427,7 @@
* @return the bytes defining the packet to process, or
* <code>null</code> if the queue is empty.
*/
- public synchronized byte[] removePacket()
- throws InterruptedException {
+ public synchronized byte[] removePacket() throws InterruptedException {
// only wait if the queue is still open. This allows processing
// of events in the queue to continue, while avoiding sleeping
// during shutdown.
@@ -487,7 +470,7 @@
remove();
}
- public void setRunning(boolean keepRunning) {
+ public void setRunning(final boolean keepRunning) {
_keepRunning = keepRunning;
}
@@ -499,8 +482,7 @@
/**
* Responsible for listening for incoming packets and processing them.
*/
- private static class TCPPortListener
- implements Runnable {
+ private static final class TCPPortListener implements Runnable {
private final Log _log;
private ServerSocket _receiveSocket;
@@ -511,7 +493,7 @@
/**
* Cache the local IP address
*/
- private byte[] _localhost;
+ private final byte[] _localhost;
/**
* The port that this listener should listen on. Configured
@@ -527,21 +509,18 @@
/**
* Construct a new TCPPortListener configured to use the specified port.
*/
- private TCPPortListener(int port, Log log)
- throws IOException {
+ private TCPPortListener(final int port, final Log log) throws IOException {
_port = port;
_log = log;
try {
- _receiveSocket = AccessController
- .doPrivileged(J2DoPrivHelper.newServerSocketAction(_port));
+ _receiveSocket = AccessController.doPrivileged(J2DoPrivHelper.newServerSocketAction(_port));
} catch (PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
_localhost = InetAddress.getLocalHost().getAddress();
if (_log.isTraceEnabled()) {
- _log.info(s_loc.get("tcp-start-listener",
- String.valueOf(_port)));
+ _log.info(s_loc.get("tcp-start-listener", String.valueOf(_port)));
}
}
@@ -552,30 +531,26 @@
}
/**
- * All providers added here will be notified of any incoming
- * provider messages. There will be one of these per
+ * All providers added here will be notified of any incoming provider messages. There will be one of these per
* BrokerFactory in a given JVM.
- * {@link TCPRemoteCommitProvider#endConfiguration} invokes
- * <code>addProvider</code> with <code>this</code> upon
+ * {@link TCPRemoteCommitProvider#endConfiguration} invokes <code>addProvider</code> with <code>this</code> upon
* completion of configuration.
*/
- private void addProvider(TCPRemoteCommitProvider provider) {
+ private void addProvider(final TCPRemoteCommitProvider provider) {
synchronized (_providers) {
_providers.add(provider);
}
}
/**
- * Remove a provider from the list of providers to notify of
- * commit events.
+ * Remove a provider from the list of providers to notify of commit events.
*/
- private synchronized void removeProvider
- (TCPRemoteCommitProvider provider) {
+ private synchronized void removeProvider(final TCPRemoteCommitProvider provider) {
synchronized (_providers) {
_providers.remove(provider);
// if the provider list is empty, shut down the thread.
- if (_providers.size() == 0) {
+ if (_providers.isEmpty()) {
_isRunning = false;
try {
_receiveSocket.close();
@@ -606,12 +581,10 @@
try {
s = null;
// Block, waiting to accept new connection from a peer
- s = AccessController.doPrivileged(J2DoPrivHelper
- .acceptAction(_receiveSocket));
+ s = AccessController.doPrivileged(J2DoPrivHelper.acceptAction(_receiveSocket));
if (_log.isTraceEnabled()) {
_log.trace(s_loc.get("tcp-received-connection",
- s.getInetAddress().getHostAddress()
- + ":" + s.getPort()));
+ s.getInetAddress().getHostAddress() + ":" + s.getPort()));
}
ReceiveSocketHandler sh = new ReceiveSocketHandler(s);
Thread receiverThread = new Thread(sh);
@@ -643,17 +616,8 @@
}
// We are done listening. Interrupt any worker threads.
- Thread worker;
- for (Iterator<Thread> iter = _receiverThreads.iterator();
- iter.hasNext();) {
- worker = iter.next();
- // FYI, the worker threads are blocked
- // reading from the socket's InputStream. InputStreams
- // aren't interruptable, so this interrupt isn't
- // really going to be delivered until something breaks
- // the InputStream.
- worker.interrupt();
- }
+ _receiverThreads.forEach(Thread::interrupt);
+
synchronized (_providers) {
try {
if (_isRunning) {
@@ -666,8 +630,7 @@
}
_isRunning = false;
if (_log.isTraceEnabled()) {
- _log.trace(s_loc.get("tcp-close-listener",
- _port + ""));
+ _log.trace(s_loc.get("tcp-close-listener", _port + ""));
}
}
}
@@ -676,13 +639,12 @@
* Utility class that acts as a worker thread to receive Events
* from broadcasters.
*/
- private class ReceiveSocketHandler
- implements Runnable {
+ private final class ReceiveSocketHandler implements Runnable {
private InputStream _in;
private Socket _s;
- private ReceiveSocketHandler(Socket s) {
+ private ReceiveSocketHandler(final Socket s) {
// We are the receiving end and we don't send any messages
// back to the broadcaster. Turn off Nagle's so that
// we will send ack packets without waiting.
@@ -718,8 +680,7 @@
// closing its end.
if (_log.isTraceEnabled()) {
_log.trace(s_loc.get("tcp-close-socket",
- _s.getInetAddress().getHostAddress()
- + ":" + _s.getPort()));
+ _s.getInetAddress().getHostAddress() + ":" + _s.getPort()));
}
break;
} catch (Throwable e) {
@@ -738,8 +699,7 @@
}
} catch (IOException e) {
_log.warn(s_loc.get("tcp-close-socket-error",
- _s.getInetAddress().getHostAddress() + ":"
- + _s.getPort()), e);
+ _s.getInetAddress().getHostAddress() + ":" + _s.getPort()), e);
}
}
@@ -747,18 +707,15 @@
* Process an {@link InputStream} containing objects written
* by {@link TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}.
*/
- private void handle(InputStream in)
- throws IOException, ClassNotFoundException {
+ private void handle(final InputStream in) throws IOException, ClassNotFoundException {
// This will block waiting for the next
- ObjectInputStream ois =
- new Serialization.ClassResolvingObjectInputStream(in);
+ ObjectInputStream ois = new Serialization.ClassResolvingObjectInputStream(in);
long protocolVersion = ois.readLong();
if (protocolVersion != PROTOCOL_VERSION) {
if (_log.isWarnEnabled()) {
_log.warn(s_loc.get("tcp-wrong-version-error",
- _s.getInetAddress().getHostAddress() + ":"
- + _s.getPort()));
+ _s.getInetAddress().getHostAddress() + ":" + _s.getPort()));
return;
}
}
@@ -773,19 +730,12 @@
+ _s.getPort()));
}
- boolean fromSelf = senderPort == _port &&
- Arrays.equals(senderAddress, _localhost);
- TCPRemoteCommitProvider provider;
+ boolean fromSelf = senderPort == _port && Arrays.equals(senderAddress, _localhost);
synchronized (_providers) {
// bleair: We're iterating, but currenlty there can really
// only be a single provider.
- for (Iterator<TCPRemoteCommitProvider> iter = _providers.iterator();
- iter.hasNext();) {
- provider = iter.next();
- if (senderId != provider._id || !fromSelf) {
- provider.eventManager.fireEvent(rce);
- }
- }
+ _providers.stream().filter(provider -> senderId != provider._id || !fromSelf).
+ forEach(provider -> provider.eventManager.fireEvent(rce));
}
}
}
@@ -796,32 +746,29 @@
* InetSocketAddress because it's a JDK1.4 API. This also
* provides a wrapper around the socket(s) associated with this address.
*/
- private class HostAddress {
+ protected class HostAddress {
- private InetAddress _address;
- private int _port;
- private long _timeLastError; // millis
- private boolean _isAvailable; // is peer thought to be up
- private int _infosIssued = 0; // limit log entries
+ protected InetAddress _address;
+ protected int _port;
+ protected long _timeLastError; // millis
+ protected boolean _isAvailable; // is peer thought to be up
+ protected int _infosIssued = 0; // limit log entries
- private GenericObjectPool<Socket> _socketPool; // reusable open sockets
+ protected final GenericObjectPool<Socket> _socketPool; // reusable open sockets
/**
- * Construct a new host address from a string of the form
- * "host:port" or of the form "host".
+ * Construct a new host address from a string of the form "host:port" or of the form "host".
+ * @param host host name
*/
- private HostAddress(String host)
- throws UnknownHostException {
+ public HostAddress(final String host) throws UnknownHostException {
int colon = host.indexOf(':');
try {
if (colon != -1) {
_address = AccessController
- .doPrivileged(J2DoPrivHelper.getByNameAction(host
- .substring(0, colon)));
+ .doPrivileged(J2DoPrivHelper.getByNameAction(host.substring(0, colon)));
_port = Integer.parseInt(host.substring(colon + 1));
} else {
- _address = AccessController
- .doPrivileged(J2DoPrivHelper.getByNameAction(host));
+ _address = AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(host));
_port = DEFAULT_PORT;
}
} catch (PrivilegedActionException pae) {
@@ -836,14 +783,22 @@
_isAvailable = true;
}
- private void setMaxTotal(int maxTotal) {
+ protected void setMaxTotal(final int maxTotal) {
_socketPool.setMaxTotal(maxTotal);
}
- private void setMaxIdle(int maxIdle) {
+ protected void setMaxIdle(final int maxIdle) {
_socketPool.setMaxIdle(maxIdle);
}
+ public InetAddress getAddress() {
+ return _address;
+ }
+
+ public int getPort() {
+ return _port;
+ }
+
public void close() {
// Close the pool of sockets to this peer. This
// will close all sockets in the pool.
@@ -856,7 +811,7 @@
}
}
- private void sendUpdatePacket(byte[] bytes) {
+ protected void sendUpdatePacket(byte[] bytes) {
if (!_isAvailable) {
long now = System.currentTimeMillis();
if (now - _timeLastError < _recoveryTimeMillis) {
@@ -872,9 +827,8 @@
os.flush();
if (log.isTraceEnabled()) {
- log.trace(s_loc.get("tcp-sent-update",
- _address.getHostAddress() + ":" + _port,
- String.valueOf(s.getLocalPort())));
+ log.trace(s_loc.get("tcp-sent-update",
+ _address.getHostAddress() + ":" + _port, String.valueOf(s.getLocalPort())));
}
_isAvailable = true;
_infosIssued = 0;
@@ -893,8 +847,7 @@
if (_isAvailable) {
// Log a warning, the peer was up and has now gone down
if (log.isWarnEnabled()) {
- log.warn(s_loc.get("tcp-send-error",
- _address.getHostAddress() + ":" + _port), e);
+ log.warn(s_loc.get("tcp-send-error", _address.getHostAddress() + ":" + _port), e);
}
_isAvailable = false;
// Once enough time has passed we will log another warning
@@ -910,9 +863,7 @@
// lower severity. This log will occur periodically
// for 5 times until the peer comes back.
if (log.isInfoEnabled()) {
- log.info(s_loc.get("tcp-send-still-error",
- _address.getHostAddress() + ":"
- + _port), e);
+ log.info(s_loc.get("tcp-send-still-error", _address.getHostAddress() + ":" + _port), e);
}
_infosIssued++;
}
@@ -921,21 +872,19 @@
}
}
- private Socket getSocket()
- throws Exception {
+ protected Socket getSocket() throws Exception {
return _socketPool.borrowObject();
}
- private void returnSocket(Socket s)
- throws Exception {
+ protected void returnSocket(final Socket s) throws Exception {
_socketPool.returnObject(s);
}
- private void clearAllSockets() {
+ protected void clearAllSockets() {
_socketPool.clear();
}
- private void closeSocket(Socket s) {
+ protected void closeSocket(final Socket s) {
// All sockets come from the pool.
// This socket is no longer usable, so delete it from the
// pool.
@@ -948,16 +897,13 @@
/**
* Factory for pooled sockets.
*/
- private class SocketPoolableObjectFactory extends BasePooledObjectFactory<Socket> {
+ protected class SocketPoolableObjectFactory extends BasePooledObjectFactory<Socket> {
@Override
public Socket create() throws Exception {
try {
- Socket s = AccessController
- .doPrivileged(J2DoPrivHelper.newSocketAction(_address,
- _port));
+ Socket s = AccessController.doPrivileged(J2DoPrivHelper.newSocketAction(_address, _port));
if (log.isTraceEnabled()) {
- log.trace(s_loc.get("tcp-open-connection", _address
- + ":" + _port, "" + s.getLocalPort()));
+ log.trace(s_loc.get("tcp-open-connection", _address + ":" + _port, "" + s.getLocalPort()));
}
return s;
} catch (PrivilegedActionException pae) {
@@ -966,24 +912,46 @@
}
@Override
- public PooledObject<Socket> wrap(Socket obj) {
+ public PooledObject<Socket> wrap(final Socket obj) {
return new DefaultPooledObject<>(obj);
}
@Override
- public void destroyObject(PooledObject<Socket> p) throws Exception {
- try {
- Socket s = p.getObject();
+ public void destroyObject(final PooledObject<Socket> p) throws Exception {
+ try (Socket s = p.getObject()) {
if (log.isTraceEnabled()) {
- log.trace(s_loc.get("tcp-close-sending-socket",
- _address + ":" + _port, "" + s.getLocalPort()));
+ log.trace(s_loc.get("tcp-close-sending-socket", _address + ":" + _port, "" + s.getLocalPort()));
}
- s.close();
} catch (Exception e) {
- log.warn(s_loc.get("tcp-close-socket-error",
- _address.getHostAddress() + ":" + _port), e);
+ log.warn(s_loc.get("tcp-close-socket-error", _address.getHostAddress() + ":" + _port), e);
}
}
}
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 37 * hash + Objects.hashCode(this._address);
+ hash = 37 * hash + this._port;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final HostAddress other = (HostAddress) obj;
+ if (this._port != other._port) {
+ return false;
+ }
+ return Objects.equals(this._address, other._address);
+ }
}
}
diff --git a/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties b/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties
index fc40679..6549c4a 100644
--- a/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties
+++ b/openjpa-kernel/src/main/resources/org/apache/openjpa/event/localizer.properties
@@ -81,6 +81,9 @@
port "{0}".
tcp-address-asself: Identified address of "{0}", which is equal to ourself.
tcp-address-set: Configured to send to peer "{0}"
+dyntcp-address-not-set: Already configured to send to peer "{0}"
+dyntcp-address-unset: Removed peer "{0}"
+dyntcp-updater-error: Error while updating hosts
tcp-received-event: Received event from peer "{0}"
tcp-open-connection: Creating new socket connection to "{0}", using local port \
"{1}".
diff --git a/openjpa-kubernetes/pom.xml b/openjpa-kubernetes/pom.xml
new file mode 100644
index 0000000..dc8ed27
--- /dev/null
+++ b/openjpa-kubernetes/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!--
+ Please keep the project tag on one line to avoid confusing
+ the release plugin.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-parent</artifactId>
+ <version>3.1.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>openjpa-kubernetes</artifactId>
+ <packaging>jar</packaging>
+ <name>OpenJPA Kubernetes</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-kernel</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/openjpa-kubernetes/src/main/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProvider.java b/openjpa-kubernetes/src/main/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProvider.java
new file mode 100644
index 0000000..7b7d945
--- /dev/null
+++ b/openjpa-kubernetes/src/main/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.event.kubernetes;
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.openjpa.event.DynamicTCPRemoteCommitProvider;
+import org.apache.openjpa.lib.util.Localizer;
+
+public class KubernetesTCPRemoteCommitProvider extends DynamicTCPRemoteCommitProvider {
+
+ private static final Localizer s_loc = Localizer.forPackage(KubernetesTCPRemoteCommitProvider.class);
+
+ private String _namespace = "<namespace>";
+
+ private String _label = "<label>";
+
+ public KubernetesTCPRemoteCommitProvider() throws UnknownHostException {
+ super();
+ }
+
+ public String getNamespace() {
+ return _namespace;
+ }
+
+ public void setNamespace(final String namespace) {
+ this._namespace = namespace;
+ }
+
+ public String getLabel() {
+ return _label;
+ }
+
+ public void setLabel(final String label) {
+ this._label = label;
+ }
+
+ protected KubernetesClient kubernetesClient() throws KubernetesClientException {
+ return new DefaultKubernetesClient();
+ }
+
+ @Override
+ protected List<String> fetchDynamicAddresses() {
+ List<String> podIPs = new ArrayList<>();
+
+ try (KubernetesClient client = kubernetesClient()) {
+ podIPs.addAll(client.pods().inNamespace(_namespace).withLabel(_label).list().
+ getItems().stream().
+ map(pod -> pod.getStatus().getPodIP()).
+ collect(Collectors.toList()));
+
+ if (log.isTraceEnabled()) {
+ log.trace(s_loc.get("kubernetestcp-pods", podIPs));
+ }
+ } catch (KubernetesClientException e) {
+ if (log.isFatalEnabled()) {
+ log.fatal(s_loc.get("kubernetestcp-error"), e);
+ }
+ }
+
+ return podIPs;
+ }
+}
diff --git a/openjpa-kubernetes/src/main/resources/org/apache/openjpa/event/kubernetes/localizer.properties b/openjpa-kubernetes/src/main/resources/org/apache/openjpa/event/kubernetes/localizer.properties
new file mode 100644
index 0000000..47fac21
--- /dev/null
+++ b/openjpa-kubernetes/src/main/resources/org/apache/openjpa/event/kubernetes/localizer.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+kubernetestcp-error: Error while setting up Kubernetes client
+kubernetestcp-pods: Pods found
diff --git a/openjpa-kubernetes/src/test/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProviderTest.java b/openjpa-kubernetes/src/test/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProviderTest.java
new file mode 100644
index 0000000..d8e1e1f
--- /dev/null
+++ b/openjpa-kubernetes/src/test/java/org/apache/openjpa/event/kubernetes/KubernetesTCPRemoteCommitProviderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.event.kubernetes;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import java.lang.reflect.Field;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.openjpa.event.TCPRemoteCommitProvider;
+import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.log.SLF4JLogFactory;
+import org.jmock.Expectations;
+import org.jmock.integration.junit4.JUnitRuleMockery;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class KubernetesTCPRemoteCommitProviderTest {
+
+ private static final String NAMESPACE = "ns1";
+
+ private static final String LABEL = "testKey";
+
+ @Rule
+ public JUnitRuleMockery context = new JUnitRuleMockery();
+
+ @Rule
+ public KubernetesServer server = new KubernetesServer(true, true);
+
+ private Pod pod1;
+
+ private Pod pod2;
+
+ private Pod pod3;
+
+ private Pod pod4;
+
+ @Before
+ public void setupKubernetes() {
+ KubernetesClient client = server.getClient();
+
+ pod1 = new PodBuilder().
+ withNewMetadata().
+ withName("pod1").
+ addToLabels(LABEL, "value1").
+ endMetadata().
+ withStatus(new PodStatusBuilder().withPodIP("1.1.1.1").build()).
+ build();
+ client.pods().inNamespace(NAMESPACE).create(pod1);
+
+ pod2 = new PodBuilder().
+ withNewMetadata().
+ withName("pod2").
+ addToLabels(LABEL, "value2").
+ endMetadata().
+ withStatus(new PodStatusBuilder().withPodIP("2.2.2.2").build()).
+ build();
+ client.pods().inNamespace(NAMESPACE).create(pod2);
+
+ pod3 = new PodBuilder().
+ withNewMetadata().
+ withName("pod3").
+ endMetadata().
+ withStatus(new PodStatusBuilder().withPodIP("3.3.3.3").build()).
+ build();
+ client.pods().inNamespace("ns2").create(pod3);
+
+ pod4 = new PodBuilder().
+ withNewMetadata().
+ withName("pod4").
+ addToLabels("other", "value1").
+ endMetadata().
+ withStatus(new PodStatusBuilder().withPodIP("4.4.4.4").build()).
+ build();
+ client.pods().inNamespace(NAMESPACE).create(pod4);
+
+ PodList podList = client.pods().inNamespace(NAMESPACE).withLabel(LABEL).list();
+ assertNotNull(podList);
+ assertEquals(2, podList.getItems().size());
+ assertTrue(podList.getItems().contains(pod1));
+ assertTrue(podList.getItems().contains(pod2));
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> getAddresses(final KubernetesTCPRemoteCommitProvider rcp)
+ throws NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
+
+ Field _addresses = TCPRemoteCommitProvider.class.getDeclaredField("_addresses");
+ _addresses.setAccessible(true);
+
+ return new ArrayList<>((List<Object>) _addresses.get(rcp)).stream().
+ map(ReflectionToStringBuilder::toString).
+ map(address -> StringUtils.substringAfter(address, "_address=/")).
+ map(address -> StringUtils.substringBefore(address, ",")).
+ collect(Collectors.toList());
+ }
+
+ @Test
+ public void addresses() throws UnknownHostException, NoSuchFieldException,
+ IllegalArgumentException, IllegalAccessException, InterruptedException {
+
+ // prepare KubernetesTCPRemoteCommitProvider instance, inject mocked Kubernetes client
+ KubernetesTCPRemoteCommitProvider rcp = new KubernetesTCPRemoteCommitProvider() {
+
+ @Override
+ protected KubernetesClient kubernetesClient() throws KubernetesClientException {
+ return server.getClient();
+ }
+ };
+ rcp.setNamespace(NAMESPACE);
+ rcp.setLabel(LABEL);
+ rcp.setCacheDurationMillis(500);
+
+ // mock OpenJPA configuration
+ Configuration conf = context.mock(Configuration.class);
+ context.checking(new Expectations() {
+
+ {
+ oneOf(conf).getLog(with(any(String.class)));
+ will(returnValue(new SLF4JLogFactory().getLog("")));
+ }
+ });
+ rcp.setConfiguration(conf);
+
+ // finalize
+ rcp.endConfiguration();
+
+ // expect to find remote addresses of matching pods
+ List<String> addresses = getAddresses(rcp);
+ assertEquals(2, addresses.size());
+ assertTrue(addresses.contains(pod1.getStatus().getPodIP()));
+ assertTrue(addresses.contains(pod2.getStatus().getPodIP()));
+
+ Thread.sleep(500);
+
+ addresses = getAddresses(rcp);
+ assertEquals(2, addresses.size());
+ assertTrue(addresses.contains(pod1.getStatus().getPodIP()));
+ assertTrue(addresses.contains(pod2.getStatus().getPodIP()));
+ }
+}
diff --git a/openjpa/pom.xml b/openjpa/pom.xml
index c3ed6d5..7a93f96 100644
--- a/openjpa/pom.xml
+++ b/openjpa/pom.xml
@@ -83,6 +83,7 @@
<include>org.apache.openjpa:openjpa-persistence-jdbc</include>
<include>org.apache.openjpa:openjpa-xmlstore</include>
<include>org.apache.openjpa:openjpa-slice</include>
+ <include>org.apache.openjpa:openjpa-kubernetes</include>
</includes>
</artifactSet>
<!-- OpenJPA unique META-INF setup -->
@@ -234,5 +235,10 @@
<artifactId>openjpa-slice</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-kubernetes</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pom.xml b/pom.xml
index bc55e4d..8b10a13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
<mssql.connector.version>7.2.1</mssql.connector.version>
<!-- other common versions -->
+ <kubernetes-client.version>4.7.0</kubernetes-client.version>
<slf4j.version>1.7.23</slf4j.version>
<!-- Compile Java source/target class level -->
<compile.class.source>${java.class.version}</compile.class.source>
@@ -174,6 +175,7 @@
<module>openjpa-xmlstore</module>
<module>openjpa-slice</module>
<module>openjpa-jest</module>
+ <module>openjpa-kubernetes</module>
<module>openjpa</module>
<module>openjpa-project</module>
<module>openjpa-examples</module>
@@ -1790,6 +1792,16 @@
<version>2.11.1</version>
</dependency>
<dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ <version>${kubernetes-client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <version>${kubernetes-client.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>