blob: 51a40ce85549747e3d27a6564a71dd9ee6d0cd54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.testcontainers.RedisContainer;
import org.apache.nifi.redis.testcontainers.RedisReplicaContainer;
import org.apache.nifi.redis.testcontainers.RedisSentinelContainer;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_MODE_SENTINEL;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This is an integration test that is meant to be run against a real Redis instance.
*/
public class ITRedisDistributedMapCacheClientService {
public static final String CONTAINER_IMAGE_TAG = "redis:7.0.12-alpine";
private static final String masterName = "redisLeader";
@TempDir
private Path testDirectory;
private final TestRedisProcessor proc = new TestRedisProcessor();
private final TestRunner testRunner = TestRunners.newTestRunner(proc);
private final List<RedisContainer> redisContainers = new ArrayList<>();
private RedisConnectionPoolService redisConnectionPool;
@Test
public void testStandaloneRedis() throws InitializationException, IOException {
int redisPort = setupStandaloneRedis(null,null).port;
setUpRedisConnectionPool(portsToConnectionString(redisPort), pool -> {
// uncomment this to test using a different database index than the default 0
// testRunner.setProperty(pool, RedisUtils.DATABASE, "1");
});
setupRedisMapCacheClientService();
executeProcessor();
}
@Test
public void testStandaloneRedisWithUsernameAndPasswordAuthentication() throws InitializationException, IOException {
final String redisUsername = "foo";
final String redisPassword = "foobared";
final int redisPort = setupStandaloneRedis(redisUsername, redisPassword).port;
setUpRedisConnectionPool(portsToConnectionString(redisPort), pool -> {
testRunner.setProperty(redisConnectionPool, RedisUtils.USERNAME, redisUsername);
testRunner.setProperty(redisConnectionPool, RedisUtils.PASSWORD, redisPassword);
});
setupRedisMapCacheClientService();
executeProcessor();
}
@Test
public void testStandaloneRedisWithDefaultUserAuthentication() throws InitializationException, IOException {
final String redisPassword = "foobared";
final int redisPort = setupStandaloneRedis(null, redisPassword).port;
setUpRedisConnectionPool(portsToConnectionString(redisPort), pool -> {
testRunner.setProperty(redisConnectionPool, RedisUtils.PASSWORD, redisPassword);
});
setupRedisMapCacheClientService();
executeProcessor();
}
@Test
public void testSentinelRedis() throws InitializationException, IOException {
RedisContainer redisMasterContainer = setupStandaloneRedis(null,null);
String masterHost = "127.0.0.1";
int masterPort = redisMasterContainer.port;
setUpRedisReplica(masterHost, masterPort, null, null);
setUpRedisReplica(masterHost, masterPort, null, null);
int sentinelAPort = setUpSentinel(masterHost, masterPort, null, null, 2, null, null).port;
int sentinelBPort = setUpSentinel(masterHost, masterPort, null, null, 2, null, null).port;
int sentinelCPort = setUpSentinel(masterHost, masterPort, null, null, 2, null, null).port;
setUpRedisConnectionPool(portsToConnectionString(sentinelAPort, sentinelBPort, sentinelCPort), pool -> {
testRunner.setProperty(redisConnectionPool, RedisUtils.REDIS_MODE, REDIS_MODE_SENTINEL);
testRunner.setProperty(redisConnectionPool, RedisUtils.SENTINEL_MASTER, masterName);
});
setupRedisMapCacheClientService();
executeProcessor();
}
@Test
public void testSentinelRedisWithDefaultUserAuthentication() throws InitializationException, IOException {
String redisPassword = "t0p_53cr35";
String sentinelPassword = "otherPassword";
RedisContainer redisMasterContainer = setupStandaloneRedis(null, redisPassword);
String masterHost = "127.0.0.1";
int masterPort = redisMasterContainer.port;
setUpRedisReplica(masterHost, masterPort, null, redisPassword);
setUpRedisReplica(masterHost, masterPort, null, redisPassword);
int sentinelAPort = setUpSentinel(masterHost, masterPort, null, redisPassword, 2, null, sentinelPassword).port;
int sentinelBPort = setUpSentinel(masterHost, masterPort, null, redisPassword, 2, null, sentinelPassword).port;
int sentinelCPort = setUpSentinel(masterHost, masterPort, null, redisPassword, 2, null, sentinelPassword).port;
setUpRedisConnectionPool(portsToConnectionString(sentinelAPort, sentinelBPort, sentinelCPort), pool -> {
testRunner.setProperty(redisConnectionPool, RedisUtils.REDIS_MODE, REDIS_MODE_SENTINEL);
testRunner.setProperty(redisConnectionPool, RedisUtils.SENTINEL_MASTER, masterName);
testRunner.setProperty(redisConnectionPool, RedisUtils.PASSWORD, redisPassword);
testRunner.setProperty(redisConnectionPool, RedisUtils.SENTINEL_PASSWORD, sentinelPassword);
});
setupRedisMapCacheClientService();
executeProcessor();
}
@Test
public void testSentinelRedisWithUsernameAndPasswordAuthentication() throws InitializationException, IOException {
final String redisUser = "redisUser";
final String redisPassword = "t0p_53cr35";
final String sentinelUsername = "sentinelUser";
final String sentinelPassword = "otherPassword";
final RedisContainer redisMasterContainer = setupStandaloneRedis(redisUser, redisPassword);
final String masterHost = "127.0.0.1";
final int masterPort = redisMasterContainer.port;
setUpRedisReplica(masterHost, masterPort, redisUser, redisPassword);
setUpRedisReplica(masterHost, masterPort, redisUser, redisPassword);
int sentinelAPort = setUpSentinel(masterHost, masterPort, redisUser, redisPassword, 2, sentinelUsername, sentinelPassword).port;
int sentinelBPort = setUpSentinel(masterHost, masterPort, redisUser, redisPassword, 2, sentinelUsername, sentinelPassword).port;
int sentinelCPort = setUpSentinel(masterHost, masterPort, redisUser, redisPassword, 2, sentinelUsername, sentinelPassword).port;
setUpRedisConnectionPool(portsToConnectionString(sentinelAPort, sentinelBPort, sentinelCPort), pool -> {
testRunner.setProperty(redisConnectionPool, RedisUtils.REDIS_MODE, REDIS_MODE_SENTINEL);
testRunner.setProperty(redisConnectionPool, RedisUtils.SENTINEL_MASTER, masterName);
testRunner.setProperty(redisConnectionPool, RedisUtils.USERNAME, redisUser);
testRunner.setProperty(redisConnectionPool, RedisUtils.PASSWORD, redisPassword);
testRunner.setProperty(redisConnectionPool, RedisUtils.SENTINEL_USERNAME, sentinelUsername);
testRunner.setProperty(redisConnectionPool, RedisUtils.SENTINEL_PASSWORD, sentinelPassword);
});
setupRedisMapCacheClientService();
executeProcessor();
}
@AfterEach
public void teardown() {
if (redisConnectionPool != null) {
redisConnectionPool.onDisabled();
}
redisContainers.forEach(RedisContainer::stop);
}
private RedisContainer setupStandaloneRedis(@Nullable final String redisUsername, final @Nullable String redisPassword) throws IOException {
int redisPort = getAvailablePort();
RedisContainer redisContainer = new RedisContainer(CONTAINER_IMAGE_TAG);
redisContainer.mountConfigurationFrom(testDirectory);
redisContainer.setPort(redisPort);
redisContainer.addPortBinding(redisPort, redisPort);
redisContainer.setUsername(redisUsername);
redisContainer.setPassword(redisPassword);
redisContainers.add(redisContainer);
redisContainer.start();
return redisContainer;
}
private RedisReplicaContainer setUpRedisReplica(final @NonNull String masterHost,
final int masterPort,
final @Nullable String redisUsername,
final @Nullable String redisPassword) throws IOException {
int replicaPort = getAvailablePort();
RedisReplicaContainer redisReplicaContainer = new RedisReplicaContainer(CONTAINER_IMAGE_TAG);
redisReplicaContainer.mountConfigurationFrom(testDirectory);
redisReplicaContainer.setPort(replicaPort);
redisReplicaContainer.addPortBinding(replicaPort, replicaPort);
redisReplicaContainer.setReplicaOf(masterHost, masterPort);
redisReplicaContainer.setUsername(redisUsername);
redisReplicaContainer.setPassword(redisPassword);
redisContainers.add(redisReplicaContainer);
redisReplicaContainer.start();
return redisReplicaContainer;
}
private RedisSentinelContainer setUpSentinel(final @NonNull String masterHost,
final int masterPort,
final @Nullable String redisUsername,
final @Nullable String redisPassword,
final int quorumSize,
final @Nullable String sentinelUsername,
final @Nullable String sentinelPassword) throws IOException {
int sentinelPort = getAvailablePort();
RedisSentinelContainer redisSentinelContainer = new RedisSentinelContainer(CONTAINER_IMAGE_TAG);
redisSentinelContainer.mountConfigurationFrom(testDirectory);
redisSentinelContainer.setPort(sentinelPort);
redisSentinelContainer.addPortBinding(sentinelPort, sentinelPort);
redisSentinelContainer.setMasterHost(masterHost);
redisSentinelContainer.setMasterPort(masterPort);
redisSentinelContainer.setMasterName(masterName);
redisSentinelContainer.setQuorumSize(quorumSize);
redisSentinelContainer.setUsername(redisUsername);
redisSentinelContainer.setPassword(redisPassword);
redisSentinelContainer.setSentinelUsername(sentinelUsername);
redisSentinelContainer.setSentinelPassword(sentinelPassword);
redisContainers.add(redisSentinelContainer);
redisSentinelContainer.start();
return redisSentinelContainer;
}
private int getAvailablePort() throws IOException {
try (SocketChannel socket = SocketChannel.open()) {
socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.bind(new InetSocketAddress("localhost", 0));
return socket.socket().getLocalPort();
}
}
private String portsToConnectionString(int... ports) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < ports.length; i++) {
if (i > 0) {
stringBuilder.append(',');
}
stringBuilder.append("localhost");
stringBuilder.append(':');
stringBuilder.append(ports[i]);
}
return stringBuilder.toString();
}
public void setUpRedisConnectionPool(String connectionString, Consumer<RedisConnectionPool> initializer) throws InitializationException {
// create, configure, and enable the RedisConnectionPool service
redisConnectionPool = new RedisConnectionPoolService();
testRunner.addControllerService("redis-connection-pool", redisConnectionPool);
testRunner.setProperty(redisConnectionPool, RedisUtils.CONNECTION_STRING, connectionString);
if (initializer != null) {
initializer.accept(redisConnectionPool);
}
testRunner.enableControllerService(redisConnectionPool);
}
private void setupRedisMapCacheClientService() throws InitializationException {
// create, configure, and enable the RedisDistributedMapCacheClient service
RedisDistributedMapCacheClientService redisMapCacheClientService = new RedisDistributedMapCacheClientService();
testRunner.addControllerService("redis-map-cache-client", redisMapCacheClientService);
testRunner.setProperty(redisMapCacheClientService, REDIS_CONNECTION_POOL, "redis-connection-pool");
testRunner.enableControllerService(redisMapCacheClientService);
testRunner.setProperty(TestRedisProcessor.REDIS_MAP_CACHE, "redis-map-cache-client");
}
private void executeProcessor() {
// queue a flow file to trigger the processor and executeProcessor it
testRunner.enqueue("trigger");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(TestRedisProcessor.REL_SUCCESS, 1);
}
/**
* Test processor that exercises RedisDistributedMapCacheClient.
*/
private static class TestRedisProcessor extends AbstractProcessor {
public static final PropertyDescriptor REDIS_MAP_CACHE = new PropertyDescriptor.Builder()
.name("redis-map-cache")
.displayName("Redis Map Cache")
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(REDIS_MAP_CACHE);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Deserializer<String> stringDeserializerWithoutNullCheck = new StringDeserializerWithoutNullCheck();
final AtomicDistributedMapCacheClient cacheClient = context.getProperty(REDIS_MAP_CACHE).asControllerService(AtomicDistributedMapCacheClient.class);
try {
final long timestamp = System.currentTimeMillis();
final String key = "test-redis-processor-" + timestamp;
final String value = "the time is " + timestamp;
// verify the key doesn't exist, put the key/value, then verify it exists
assertFalse(cacheClient.containsKey(key, stringSerializer));
cacheClient.put(key, value, stringSerializer, stringSerializer);
assertTrue(cacheClient.containsKey(key, stringSerializer));
// verify get returns the expected value we set above
final String retrievedValue = cacheClient.get(key, stringSerializer, stringDeserializer);
assertEquals(value, retrievedValue);
// verify get returns null for a key that doesn't exist
final String missingValue = cacheClient.get("does-not-exist", stringSerializer, stringDeserializerWithoutNullCheck);
assertNull(missingValue);
// verify remove removes the entry and contains key returns false after
assertTrue(cacheClient.remove(key, stringSerializer));
assertFalse(cacheClient.containsKey(key, stringSerializer));
// verify putIfAbsent works the first time and returns false the second time
assertTrue(cacheClient.putIfAbsent(key, value, stringSerializer, stringSerializer));
assertFalse(cacheClient.putIfAbsent(key, "some other value", stringSerializer, stringSerializer));
assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify that getAndPutIfAbsent returns the existing value and doesn't modify it in the cache
final String getAndPutIfAbsentResult = cacheClient.getAndPutIfAbsent(key, value, stringSerializer, stringSerializer, stringDeserializer);
assertEquals(value, getAndPutIfAbsentResult);
assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify that getAndPutIfAbsent on a key that doesn't exist returns null
final String keyThatDoesntExist = key + "_DOES_NOT_EXIST";
assertFalse(cacheClient.containsKey(keyThatDoesntExist, stringSerializer));
final String getAndPutIfAbsentResultWhenDoesntExist = cacheClient.getAndPutIfAbsent(keyThatDoesntExist, value, stringSerializer, stringSerializer, stringDeserializer);
assertNull(getAndPutIfAbsentResultWhenDoesntExist);
assertEquals(value, cacheClient.get(keyThatDoesntExist, stringSerializer, stringDeserializer));
// verify atomic fetch returns the correct entry
final AtomicCacheEntry<String, String, byte[]> entry = cacheClient.fetch(key, stringSerializer, stringDeserializer);
assertEquals(key, entry.getKey());
assertEquals(value, entry.getValue());
assertArrayEquals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null));
final AtomicCacheEntry<String, String, byte[]> notLatestEntry = new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), "not previous".getBytes(StandardCharsets.UTF_8));
// verify atomic replace does not replace when previous value is not equal
assertFalse(cacheClient.replace(notLatestEntry, stringSerializer, stringSerializer));
assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify atomic replace does replace when previous value is equal
final String replacementValue = "this value has been replaced";
entry.setValue(replacementValue);
assertTrue(cacheClient.replace(entry, stringSerializer, stringSerializer));
assertEquals(replacementValue, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify atomic replace does replace no value previous existed
final String replaceKeyDoesntExist = key + "_REPLACE_DOES_NOT_EXIST";
final AtomicCacheEntry<String, String, byte[]> entryDoesNotExist = new AtomicCacheEntry<>(replaceKeyDoesntExist, replacementValue, null);
assertTrue(cacheClient.replace(entryDoesNotExist, stringSerializer, stringSerializer));
assertEquals(replacementValue, cacheClient.get(replaceKeyDoesntExist, stringSerializer, stringDeserializer));
final int numToDelete = 2000;
for (int i = 0; i < numToDelete; i++) {
cacheClient.put(key + "-" + i, value, stringSerializer, stringSerializer);
}
Map<String, String> bulk = new HashMap<>();
bulk.put("bulk-1", "testing1");
bulk.put("bulk-2", "testing2");
bulk.put("bulk-3", "testing3");
bulk.put("bulk-4", "testing4");
bulk.put("bulk-5", "testing5");
cacheClient.putAll(bulk, stringSerializer, stringSerializer);
assertTrue(cacheClient.containsKey("bulk-1", stringSerializer));
assertTrue(cacheClient.containsKey("bulk-2", stringSerializer));
assertTrue(cacheClient.containsKey("bulk-3", stringSerializer));
assertTrue(cacheClient.containsKey("bulk-4", stringSerializer));
assertTrue(cacheClient.containsKey("bulk-5", stringSerializer));
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Routing to failure due to: " + e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
}
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(String value, OutputStream output) throws SerializationException, IOException {
if (value != null) {
output.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}
private static class StringDeserializer implements Deserializer<String> {
@Override
public String deserialize(byte[] input) throws DeserializationException {
return input == null ? null : new String(input, StandardCharsets.UTF_8);
}
}
private static class StringDeserializerWithoutNullCheck implements Deserializer<String> {
@Override
public String deserialize(byte[] input) throws DeserializationException {
return new String(input, StandardCharsets.UTF_8);
}
}
}