| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.distributed.cache.server; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.ConnectException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.lang3.SerializationException; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; |
| import org.apache.nifi.distributed.cache.client.Deserializer; |
| import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; |
| import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; |
| import org.apache.nifi.distributed.cache.client.Serializer; |
| import org.apache.nifi.distributed.cache.client.exception.DeserializationException; |
| import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer; |
| import org.apache.nifi.distributed.cache.server.map.MapCacheServer; |
| import org.apache.nifi.processor.Processor; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.remote.StandardVersionNegotiator; |
| import org.apache.nifi.reporting.InitializationException; |
| import org.apache.nifi.util.MockConfigurationContext; |
| import org.apache.nifi.util.MockControllerServiceInitializationContext; |
| import org.apache.nifi.util.TestRunner; |
| import org.apache.nifi.util.TestRunners; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.net.ssl.SSLContext; |
| |
| public class TestServerAndClient { |
| |
| private static final Logger LOGGER; |
| |
| static { |
| System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); |
| System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.AbstractCacheServer", "debug"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.client.DistributedMapCacheClientService", "debug"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.TestServerAndClient", "debug"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.remote.io.socket.ssl.SSLSocketChannel", "trace"); |
| LOGGER = LoggerFactory.getLogger(TestServerAndClient.class); |
| } |
| |
| @Test |
| public void testNonPersistentSetServerAndClient() throws InitializationException, IOException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| // Create server |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| final DistributedSetCacheServer server = new SetServer(); |
| runner.addControllerService("server", server); |
| runner.enableControllerService(server); |
| |
| final DistributedSetCacheClientService client = createClient(server.getPort()); |
| final Serializer<String> serializer = new StringSerializer(); |
| final boolean added = client.addIfAbsent("test", serializer); |
| assertTrue(added); |
| |
| final boolean contains = client.contains("test", serializer); |
| assertTrue(contains); |
| |
| final boolean addedAgain = client.addIfAbsent("test", serializer); |
| assertFalse(addedAgain); |
| |
| final boolean removed = client.remove("test", serializer); |
| assertTrue(removed); |
| |
| final boolean containedAfterRemove = client.contains("test", serializer); |
| assertFalse(containedAfterRemove); |
| |
| server.shutdownServer(); |
| } |
| |
| @Test |
| public void testPersistentSetServerAndClient() throws InitializationException, IOException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| |
| final File dataFile = new File("target/cache-data"); |
| deleteRecursively(dataFile); |
| |
| // Create server |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| final DistributedSetCacheServer server = new SetServer(); |
| runner.addControllerService("server", server); |
| runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.enableControllerService(server); |
| |
| DistributedSetCacheClientService client = createClient(server.getPort()); |
| final Serializer<String> serializer = new StringSerializer(); |
| final boolean added = client.addIfAbsent("test", serializer); |
| final boolean added2 = client.addIfAbsent("test2", serializer); |
| assertTrue(added); |
| assertTrue(added2); |
| |
| final boolean contains = client.contains("test", serializer); |
| final boolean contains2 = client.contains("test2", serializer); |
| assertTrue(contains); |
| assertTrue(contains2); |
| |
| final boolean addedAgain = client.addIfAbsent("test", serializer); |
| assertFalse(addedAgain); |
| |
| final boolean removed = client.remove("test", serializer); |
| assertTrue(removed); |
| |
| final boolean containedAfterRemove = client.contains("test", serializer); |
| assertFalse(containedAfterRemove); |
| |
| client.close(); |
| server.shutdownServer(); |
| |
| final DistributedSetCacheServer newServer = new SetServer(); |
| runner.addControllerService("server2", newServer); |
| runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.enableControllerService(newServer); |
| client = createClient(newServer.getPort()); |
| |
| assertFalse(client.contains("test", serializer)); |
| assertTrue(client.contains("test2", serializer)); |
| |
| client.close(); |
| newServer.shutdownServer(); |
| } |
| |
| @Test |
| public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| // Create server |
| final File dataFile = new File("target/cache-data"); |
| deleteRecursively(dataFile); |
| |
| // Create server |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| final DistributedSetCacheServer server = new SetServer(); |
| runner.addControllerService("server", server); |
| runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); |
| runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU); |
| runner.enableControllerService(server); |
| |
| DistributedSetCacheClientService client = createClient(server.getPort()); |
| final Serializer<String> serializer = new StringSerializer(); |
| final boolean added = client.addIfAbsent("test", serializer); |
| waitABit(); |
| final boolean added2 = client.addIfAbsent("test2", serializer); |
| waitABit(); |
| final boolean added3 = client.addIfAbsent("test3", serializer); |
| waitABit(); |
| assertTrue(added); |
| assertTrue(added2); |
| assertTrue(added3); |
| |
| final boolean contains = client.contains("test", serializer); |
| final boolean contains2 = client.contains("test2", serializer); |
| assertTrue(contains); |
| assertTrue(contains2); |
| |
| final boolean addedAgain = client.addIfAbsent("test", serializer); |
| assertFalse(addedAgain); |
| |
| final boolean added4 = client.addIfAbsent("test4", serializer); |
| assertTrue(added4); |
| |
| // ensure that added3 was evicted because it was used least frequently |
| assertFalse(client.contains("test3", serializer)); |
| |
| client.close(); |
| server.shutdownServer(); |
| |
| |
| final DistributedSetCacheServer newServer = new SetServer(); |
| runner.addControllerService("server2", newServer); |
| runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.enableControllerService(newServer); |
| client = createClient(newServer.getPort()); |
| |
| assertTrue(client.contains("test", serializer)); |
| assertTrue(client.contains("test2", serializer)); |
| assertFalse(client.contains("test3", serializer)); |
| assertTrue(client.contains("test4", serializer)); |
| |
| client.close(); |
| newServer.shutdownServer(); |
| } |
| |
| @Test |
| public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| // Create server |
| final File dataFile = new File("target/cache-data"); |
| deleteRecursively(dataFile); |
| |
| // Create server |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| final DistributedMapCacheServer server = new MapServer(); |
| runner.addControllerService("server", server); |
| runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3"); |
| runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU); |
| runner.enableControllerService(server); |
| |
| DistributedMapCacheClientService client = createMapClient(server.getPort()); |
| final Serializer<String> serializer = new StringSerializer(); |
| final boolean added = client.putIfAbsent("test", "1", serializer, serializer); |
| waitABit(); |
| final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer); |
| waitABit(); |
| final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer); |
| waitABit(); |
| assertTrue(added); |
| assertTrue(added2); |
| assertTrue(added3); |
| |
| final boolean contains = client.containsKey("test", serializer); |
| final boolean contains2 = client.containsKey("test2", serializer); |
| assertTrue(contains); |
| assertTrue(contains2); |
| |
| final Deserializer<String> deserializer = new StringDeserializer(); |
| final Set<String> keys = client.keySet(deserializer); |
| assertEquals(3, keys.size()); |
| assertTrue(keys.contains("test")); |
| assertTrue(keys.contains("test2")); |
| assertTrue(keys.contains("test3")); |
| |
| final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer); |
| assertFalse(addedAgain); |
| |
| final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer); |
| assertTrue(added4); |
| |
| // ensure that added3 was evicted because it was used least frequently |
| assertFalse(client.containsKey("test3", serializer)); |
| |
| client.close(); |
| server.shutdownServer(); |
| |
| final DistributedMapCacheServer newServer = new MapServer(); |
| runner.addControllerService("server2", newServer); |
| runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.enableControllerService(newServer); |
| client = createMapClient(newServer.getPort()); |
| |
| assertTrue(client.containsKey("test", serializer)); |
| assertTrue(client.containsKey("test2", serializer)); |
| assertFalse(client.containsKey("test3", serializer)); |
| assertTrue(client.containsKey("test4", serializer)); |
| |
| // Test removeByPattern, the first two should be removed and the last should remain |
| client.put("test.1", "1", serializer, serializer); |
| client.put("test.2", "2", serializer, serializer); |
| client.put("test3", "2", serializer, serializer); |
| final long removedTwo = client.removeByPattern("test\\..*"); |
| assertEquals(2L, removedTwo); |
| assertFalse(client.containsKey("test.1", serializer)); |
| assertFalse(client.containsKey("test.2", serializer)); |
| assertTrue(client.containsKey("test3", serializer)); |
| |
| // test removeByPatternAndGet |
| client.put("test.1", "1", serializer, serializer); |
| client.put("test.2", "2", serializer, serializer); |
| Map<String,String> removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer); |
| assertEquals(2, removed.size()); |
| assertTrue(removed.containsKey("test.1")); |
| assertTrue(removed.containsKey("test.2")); |
| assertFalse(client.containsKey("test.1", serializer)); |
| assertFalse(client.containsKey("test.2", serializer)); |
| assertTrue(client.containsKey("test3", serializer)); |
| removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer); |
| assertEquals(0, removed.size()); |
| |
| client.close(); |
| newServer.shutdownServer(); |
| } |
| |
| @Test |
| public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| |
| final File dataFile = new File("target/cache-data"); |
| deleteRecursively(dataFile); |
| |
| // Create server |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| final DistributedSetCacheServer server = new SetServer(); |
| runner.addControllerService("server", server); |
| runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); |
| runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); |
| runner.enableControllerService(server); |
| |
| DistributedSetCacheClientService client = createClient(server.getPort()); |
| final Serializer<String> serializer = new StringSerializer(); |
| |
| // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond |
| // for the entry time so we don't know which entry will be evicted. So we wait a few millis in between |
| final boolean added = client.addIfAbsent("test", serializer); |
| waitABit(); |
| final boolean added2 = client.addIfAbsent("test2", serializer); |
| waitABit(); |
| final boolean added3 = client.addIfAbsent("test3", serializer); |
| waitABit(); |
| |
| assertTrue(added); |
| assertTrue(added2); |
| assertTrue(added3); |
| |
| final boolean contains = client.contains("test", serializer); |
| final boolean contains2 = client.contains("test2", serializer); |
| assertTrue(contains); |
| assertTrue(contains2); |
| |
| final boolean addedAgain = client.addIfAbsent("test", serializer); |
| assertFalse(addedAgain); |
| |
| final boolean added4 = client.addIfAbsent("test4", serializer); |
| assertTrue(added4); |
| |
| // ensure that added3 was evicted because it was used least frequently |
| assertFalse(client.contains("test", serializer)); |
| assertTrue(client.contains("test3", serializer)); |
| |
| client.close(); |
| server.shutdownServer(); |
| |
| final DistributedSetCacheServer newServer = new SetServer(); |
| runner.addControllerService("server2", newServer); |
| runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); |
| runner.setProperty(newServer, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); |
| runner.setProperty(newServer, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); |
| runner.enableControllerService(newServer); |
| |
| client = createClient(newServer.getPort()); |
| assertFalse(client.contains("test", serializer)); |
| assertTrue(client.contains("test2", serializer)); |
| assertTrue(client.contains("test3", serializer)); |
| assertTrue(client.contains("test4", serializer)); |
| |
| client.close(); |
| newServer.shutdownServer(); |
| } |
| |
| @Test |
| public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| |
| // Create server |
| final DistributedMapCacheServer server = new MapServer(); |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| runner.addControllerService("server", server); |
| runner.enableControllerService(server); |
| |
| DistributedMapCacheClientService client = new DistributedMapCacheClientService(); |
| runner.addControllerService("client", client); |
| runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost"); |
| runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); |
| runner.enableControllerService(client); |
| |
| final Serializer<String> valueSerializer = new StringSerializer(); |
| final Serializer<String> keySerializer = new StringSerializer(); |
| final Deserializer<String> deserializer = new StringDeserializer(); |
| |
| final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); |
| assertEquals(null, original); |
| LOGGER.debug("end getAndPutIfAbsent"); |
| |
| final boolean contains = client.containsKey("testKey", keySerializer); |
| assertTrue(contains); |
| LOGGER.debug("end containsKey"); |
| |
| final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); |
| assertFalse(added); |
| LOGGER.debug("end putIfAbsent"); |
| |
| final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); |
| assertEquals("test", originalAfterPut); |
| LOGGER.debug("end getAndPutIfAbsent"); |
| |
| final boolean removed = client.remove("testKey", keySerializer); |
| assertTrue(removed); |
| LOGGER.debug("end remove"); |
| |
| client.put("testKey", "testValue", keySerializer, valueSerializer); |
| assertTrue(client.containsKey("testKey", keySerializer)); |
| String removedValue = client.removeAndGet("testKey", keySerializer, deserializer); |
| assertEquals("testValue", removedValue); |
| removedValue = client.removeAndGet("testKey", keySerializer, deserializer); |
| assertNull(removedValue); |
| |
| final Set<String> keys = client.keySet(deserializer); |
| assertEquals(0, keys.size()); |
| |
| // Test removeByPattern, the first two should be removed and the last should remain |
| client.put("test.1", "1", keySerializer, keySerializer); |
| client.put("test.2", "2", keySerializer, keySerializer); |
| client.put("test3", "2", keySerializer, keySerializer); |
| final long removedTwo = client.removeByPattern("test\\..*"); |
| assertEquals(2L, removedTwo); |
| assertFalse(client.containsKey("test.1", keySerializer)); |
| assertFalse(client.containsKey("test.2", keySerializer)); |
| assertTrue(client.containsKey("test3", keySerializer)); |
| |
| final boolean containedAfterRemove = client.containsKey("testKey", keySerializer); |
| assertFalse(containedAfterRemove); |
| |
| client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); |
| runner.disableControllerService(client); |
| try { |
| client.containsKey("testKey", keySerializer); |
| fail("Should be closed and not accessible"); |
| } catch (final Exception e) { |
| |
| } |
| |
| DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); |
| runner.addControllerService("client2", client2); |
| runner.setProperty(client2, DistributedMapCacheClientService.HOSTNAME, "localhost"); |
| runner.setProperty(client2, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); |
| runner.enableControllerService(client2); |
| |
| assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer)); |
| assertTrue(client2.containsKey("testKey", keySerializer)); |
| server.shutdownServer(); |
| Thread.sleep(1000); |
| try { |
| client2.containsKey("testKey", keySerializer); |
| fail("Should have blown exception!"); |
| } catch (final ConnectException e) { |
| client2 = null; |
| } |
| LOGGER.debug("end testNonPersistentMapServerAndClient"); |
| } |
| |
| @Test |
| public void testClientTermination() throws InitializationException, IOException, InterruptedException { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| // Create server |
| final DistributedMapCacheServer server = new MapServer(); |
| final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); |
| server.initialize(serverInitContext); |
| |
| final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); |
| final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); |
| server.startServer(serverContext); |
| |
| DistributedMapCacheClientService client = new DistributedMapCacheClientService(); |
| MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); |
| client.initialize(clientInitContext); |
| |
| final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); |
| clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); |
| MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); |
| client.onEnabled(clientContext); |
| final Serializer<String> valueSerializer = new StringSerializer(); |
| final Serializer<String> keySerializer = new StringSerializer(); |
| final Deserializer<String> deserializer = new StringDeserializer(); |
| |
| final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); |
| assertEquals(null, original); |
| |
| final boolean contains = client.containsKey("testKey", keySerializer); |
| assertTrue(contains); |
| |
| final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); |
| assertFalse(added); |
| |
| final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); |
| assertEquals("test", originalAfterPut); |
| |
| final boolean removed = client.remove("testKey", keySerializer); |
| assertTrue(removed); |
| |
| final boolean containedAfterRemove = client.containsKey("testKey", keySerializer); |
| assertFalse(containedAfterRemove); |
| |
| client = null; |
| clientInitContext = null; |
| clientContext = null; |
| Thread.sleep(2000); |
| System.gc(); |
| server.shutdownServer(); |
| } |
| |
| @Test |
| public void testOptimisticLock() throws Exception { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| |
| // Create server |
| final DistributedMapCacheServer server = new MapServer(); |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| runner.addControllerService("server", server); |
| runner.enableControllerService(server); |
| |
| DistributedMapCacheClientService client1 = new DistributedMapCacheClientService(); |
| MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1"); |
| client1.initialize(clientInitContext1); |
| |
| DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); |
| MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2"); |
| client2.initialize(clientInitContext2); |
| |
| final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); |
| clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); |
| clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); |
| |
| MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup()); |
| client1.onEnabled(clientContext1); |
| MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup()); |
| client2.onEnabled(clientContext2); |
| |
| final Serializer<String> stringSerializer = new StringSerializer(); |
| final Deserializer<String> stringDeserializer = new StringDeserializer(); |
| |
| final String key = "test-optimistic-lock"; |
| |
| // Ensure there's no existing key |
| assertFalse(client1.containsKey(key, stringSerializer)); |
| assertNull(client1.fetch(key, stringSerializer, stringDeserializer)); |
| |
| // Client 1 inserts the key. |
| client1.put(key, "valueC1-0", stringSerializer, stringSerializer); |
| |
| // Client 1 and 2 fetch the key |
| AtomicCacheEntry<String, String, Long> c1 = client1.fetch(key, stringSerializer, stringDeserializer); |
| AtomicCacheEntry<String, String, Long> c2 = client2.fetch(key, stringSerializer, stringDeserializer); |
| assertEquals(new Long(0), c1.getRevision().orElse(0L)); |
| assertEquals("valueC1-0", c1.getValue()); |
| assertEquals(new Long(0), c2.getRevision().orElse(0L)); |
| assertEquals("valueC1-0", c2.getValue()); |
| |
| // Client 1 replace |
| c1.setValue("valueC1-1"); |
| boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer); |
| assertTrue("C1 should be able to replace the key", c1Result); |
| // Client 2 replace with the old revision |
| c2.setValue("valueC2-1"); |
| boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer); |
| assertFalse("C2 shouldn't be able to replace the key", c2Result); |
| |
| // Client 2 fetch the key again |
| c2 = client2.fetch(key, stringSerializer, stringDeserializer); |
| assertEquals("valueC1-1", c2.getValue()); |
| assertEquals(new Long(1), c2.getRevision().orElse(0L)); |
| |
| // Now, Client 2 knows the correct revision so it can replace the key |
| c2.setValue("valueC2-2"); |
| c2Result = client2.replace(c2, stringSerializer, stringSerializer); |
| assertTrue("C2 should be able to replace the key", c2Result); |
| |
| // Assert the cache |
| c2 = client2.fetch(key, stringSerializer, stringDeserializer); |
| assertEquals("valueC2-2", c2.getValue()); |
| assertEquals(new Long(2), c2.getRevision().orElse(0L)); |
| |
| client1.close(); |
| client2.close(); |
| server.shutdownServer(); |
| } |
| |
| @Test |
| public void testBackwardCompatibility() throws Exception { |
| LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); |
| |
| final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); |
| |
| // Create a server that only supports protocol version 1. |
| final DistributedMapCacheServer server = new MapServer() { |
| @Override |
| protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException { |
| return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir) { |
| @Override |
| protected StandardVersionNegotiator getVersionNegotiator() { |
| return new StandardVersionNegotiator(1); |
| } |
| }; |
| } |
| }; |
| runner.addControllerService("server", server); |
| runner.enableControllerService(server); |
| |
| DistributedMapCacheClientService client = new DistributedMapCacheClientService(); |
| MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client"); |
| client.initialize(clientInitContext1); |
| |
| final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); |
| clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); |
| clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); |
| |
| MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup()); |
| client.onEnabled(clientContext); |
| |
| final Serializer<String> stringSerializer = new StringSerializer(); |
| final Deserializer<String> stringDeserializer = new StringDeserializer(); |
| |
| final String key = "test-backward-compatibility"; |
| |
| // Version 1 operations should work |
| client.put(key, "value1", stringSerializer, stringSerializer); |
| assertEquals("value1", client.get(key, stringSerializer, stringDeserializer)); |
| |
| assertTrue(client.containsKey(key, stringSerializer)); |
| |
| try { |
| client.fetch(key, stringSerializer, stringDeserializer); |
| fail("Version 2 operations should NOT work."); |
| } catch (UnsupportedOperationException e) { |
| } |
| |
| try { |
| AtomicCacheEntry<String,String,Long> entry = new AtomicCacheEntry<>(key, "value2", 0L); |
| client.replace(entry, stringSerializer, stringSerializer); |
| fail("Version 2 operations should NOT work."); |
| } catch (UnsupportedOperationException e) { |
| } |
| |
| try { |
| Set<String> keys = client.keySet(stringDeserializer); |
| fail("Version 3 operations should NOT work."); |
| } catch (UnsupportedOperationException e) { |
| } |
| |
| try { |
| String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer); |
| fail("Version 3 operations should NOT work."); |
| } catch (UnsupportedOperationException e) { |
| } |
| |
| try { |
| Map<String, String> removed = client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer); |
| fail("Version 3 operations should NOT work."); |
| } catch (UnsupportedOperationException e) { |
| } |
| client.close(); |
| server.shutdownServer(); |
| } |
| |
| |
| private void waitABit() { |
| try { |
| Thread.sleep(10L); |
| } catch (final InterruptedException e) { |
| } |
| } |
| |
| private DistributedSetCacheClientService createClient(final int port) throws InitializationException { |
| final DistributedSetCacheClientService client = new DistributedSetCacheClientService(); |
| final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); |
| client.initialize(clientInitContext); |
| |
| final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost"); |
| clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port)); |
| final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); |
| client.onEnabled(clientContext); |
| |
| return client; |
| } |
| |
| private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException { |
| final DistributedMapCacheClientService client = new DistributedMapCacheClientService(); |
| final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); |
| client.initialize(clientInitContext); |
| |
| final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); |
| clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port)); |
| final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); |
| client.onEnabled(clientContext); |
| |
| return client; |
| } |
| |
| private static class StringSerializer implements Serializer<String> { |
| |
| @Override |
| public void serialize(final String value, final OutputStream output) throws SerializationException, IOException { |
| output.write(value.getBytes(StandardCharsets.UTF_8)); |
| } |
| } |
| |
| private static class StringDeserializer implements Deserializer<String> { |
| |
| @Override |
| public String deserialize(final byte[] input) throws DeserializationException, IOException { |
| return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8); |
| } |
| } |
| |
| private static void deleteRecursively(final File dataFile) throws IOException { |
| if (dataFile == null || !dataFile.exists()) { |
| return; |
| } |
| |
| final File[] children = dataFile.listFiles(); |
| for (final File child : children) { |
| if (child.isDirectory()) { |
| deleteRecursively(child); |
| } else { |
| for (int i = 0; i < 100 && child.exists(); i++) { |
| child.delete(); |
| } |
| |
| if (child.exists()) { |
| throw new IOException("Could not delete " + dataFile.getAbsolutePath()); |
| } |
| } |
| } |
| } |
| |
| private static List<PropertyDescriptor> replacePortDescriptor(final List<PropertyDescriptor> descriptors) { |
| descriptors.remove(DistributedCacheServer.PORT); |
| descriptors.add(new PropertyDescriptor.Builder() |
| .name("Port") |
| .description("The port to listen on for incoming connections") |
| .required(true) |
| .addValidator(StandardValidators.createLongValidator(0L, 65535L, true)) |
| .defaultValue("0") |
| .build()); |
| return descriptors; |
| } |
| |
| private static class SetServer extends DistributedSetCacheServer { |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return replacePortDescriptor(super.getSupportedPropertyDescriptors()); |
| } |
| } |
| |
| private static class MapServer extends DistributedMapCacheServer { |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return replacePortDescriptor(super.getSupportedPropertyDescriptors()); |
| } |
| } |
| } |