| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.redis.service; |
| |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnDisabled; |
| import org.apache.nifi.annotation.lifecycle.OnEnabled; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; |
| import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; |
| import org.apache.nifi.distributed.cache.client.Deserializer; |
| import org.apache.nifi.distributed.cache.client.Serializer; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.redis.RedisConnectionPool; |
| import org.apache.nifi.redis.RedisType; |
| import org.apache.nifi.redis.util.RedisAction; |
| import org.apache.nifi.util.Tuple; |
| import org.springframework.data.redis.connection.RedisConnection; |
| import org.springframework.data.redis.core.Cursor; |
| import org.springframework.data.redis.core.ScanOptions; |
| import org.springframework.data.redis.core.types.Expiration; |
| import org.springframework.data.redis.connection.RedisStringCommands.SetOption; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| @Tags({ "redis", "distributed", "cache", "map" }) |
| @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " + |
| "the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " + |
| "can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " + |
| "provide high-availability configurations.") |
| public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> { |
| |
| public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder() |
| .name("redis-connection-pool") |
| .displayName("Redis Connection Pool") |
| .identifiesControllerService(RedisConnectionPool.class) |
| .required(true) |
| .build(); |
| |
| public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() |
| .name("redis-cache-ttl") |
| .displayName("TTL") |
| .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever") |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .required(true) |
| .defaultValue("0 secs") |
| .build(); |
| |
| static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; |
| static { |
| final List<PropertyDescriptor> props = new ArrayList<>(); |
| props.add(REDIS_CONNECTION_POOL); |
| props.add(TTL); |
| PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); |
| } |
| |
| private volatile RedisConnectionPool redisConnectionPool; |
| private Long ttl; |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return PROPERTY_DESCRIPTORS; |
| } |
| |
| @Override |
| protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { |
| final List<ValidationResult> results = new ArrayList<>(); |
| |
| final RedisConnectionPool redisConnectionPool = validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); |
| if (redisConnectionPool != null) { |
| final RedisType redisType = redisConnectionPool.getRedisType(); |
| if (redisType != null && redisType == RedisType.CLUSTER) { |
| results.add(new ValidationResult.Builder() |
| .subject(REDIS_CONNECTION_POOL.getDisplayName()) |
| .valid(false) |
| .explanation(REDIS_CONNECTION_POOL.getDisplayName() |
| + " is configured in clustered mode, and this service requires a non-clustered Redis") |
| .build()); |
| } |
| } |
| |
| return results; |
| } |
| |
| @OnEnabled |
| public void onEnabled(final ConfigurationContext context) { |
| this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); |
| this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS); |
| |
| if (ttl == 0) { |
| this.ttl = -1L; |
| } |
| } |
| |
| @OnDisabled |
| public void onDisabled() { |
| this.redisConnectionPool = null; |
| } |
| |
| @Override |
| public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); |
| boolean set = redisConnection.setNX(kv.getKey(), kv.getValue()); |
| |
| if (ttl != -1L && set) { |
| redisConnection.expire(kv.getKey(), ttl); |
| } |
| |
| return set; |
| }); |
| } |
| |
| @Override |
| public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); |
| do { |
| // start a watch on the key and retrieve the current value |
| redisConnection.watch(kv.getKey()); |
| final byte[] existingValue = redisConnection.get(kv.getKey()); |
| |
| // start a transaction and perform the put-if-absent |
| redisConnection.multi(); |
| redisConnection.setNX(kv.getKey(), kv.getValue()); |
| |
| // Set the TTL only if the key doesn't exist already |
| if (ttl != -1L && existingValue == null) { |
| redisConnection.expire(kv.getKey(), ttl); |
| } |
| |
| // execute the transaction |
| final List<Object> results = redisConnection.exec(); |
| |
| // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry |
| // if the results list was null, then the transaction failed |
| // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation |
| if (results != null && results.size() > 0) { |
| final Object firstResult = results.get(0); |
| if (firstResult instanceof Boolean) { |
| final Boolean absent = (Boolean) firstResult; |
| return absent ? null : valueDeserializer.deserialize(existingValue); |
| } else { |
| // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop |
| throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " |
| + firstResult.getClass().getName() + " with value " + firstResult.toString()); |
| } |
| } |
| } while (isEnabled()); |
| |
| return null; |
| }); |
| } |
| |
| @Override |
| public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final byte[] k = serialize(key, keySerializer); |
| return redisConnection.exists(k); |
| }); |
| } |
| |
| @Override |
| public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { |
| withConnection(redisConnection -> { |
| final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); |
| redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), SetOption.upsert()); |
| return null; |
| }); |
| } |
| |
| @Override |
| public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { |
| withConnection(redisConnection -> { |
| Map<byte[], byte[]> values = new HashMap<>(); |
| for (Map.Entry<K, V> entry : keysAndValues.entrySet()) { |
| final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); |
| values.put(kv.getKey(), kv.getValue()); |
| } |
| |
| if (getLogger().isDebugEnabled()) { |
| getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size())); |
| } |
| |
| if (!values.isEmpty()) { |
| redisConnection.mSet(values); |
| if (ttl != -1L) { |
| values.keySet().forEach(k -> redisConnection.expire(k, ttl)); |
| } |
| } |
| return null; |
| }); |
| } |
| |
| @Override |
| public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final byte[] k = serialize(key, keySerializer); |
| final byte[] v = redisConnection.get(k); |
| return v == null ? null : valueDeserializer.deserialize(v); |
| }); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // nothing to do |
| } |
| |
| @Override |
| public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final byte[] k = serialize(key, keySerializer); |
| final long numRemoved = redisConnection.del(k); |
| return numRemoved > 0; |
| }); |
| } |
| |
| @Override |
| public long removeByPattern(final String regex) throws IOException { |
| return withConnection(redisConnection -> { |
| long deletedCount = 0; |
| final List<byte[]> batchKeys = new ArrayList<>(); |
| |
| // delete keys in batches of 1000 using the cursor |
| final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build()); |
| while (cursor.hasNext()) { |
| batchKeys.add(cursor.next()); |
| |
| if (batchKeys.size() == 1000) { |
| deletedCount += redisConnection.del(getKeys(batchKeys)); |
| batchKeys.clear(); |
| } |
| } |
| |
| // delete any left-over keys if some were added to the batch but never reached 1000 |
| if (batchKeys.size() > 0) { |
| deletedCount += redisConnection.del(getKeys(batchKeys)); |
| batchKeys.clear(); |
| } |
| |
| return deletedCount; |
| }); |
| } |
| |
| /** |
| * Convert the list of all keys to an array. |
| */ |
| private byte[][] getKeys(final List<byte[]> keys) { |
| final byte[][] allKeysArray = new byte[keys.size()][]; |
| for (int i=0; i < keys.size(); i++) { |
| allKeysArray[i] = keys.get(i); |
| } |
| return allKeysArray; |
| } |
| |
| // ----------------- Methods from AtomicDistributedMapCacheClient ------------------------ |
| |
| @Override |
| public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final byte[] k = serialize(key, keySerializer); |
| |
| final byte[] v = redisConnection.get(k); |
| if (v == null) { |
| return null; |
| } |
| |
| // for Redis we are going to use the raw bytes of the value as the revision |
| return new AtomicCacheEntry<>(key, valueDeserializer.deserialize(v), v); |
| }); |
| } |
| |
| @Override |
| public <K, V> boolean replace(final AtomicCacheEntry<K, V, byte[]> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { |
| return withConnection(redisConnection -> { |
| final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| |
| keySerializer.serialize(entry.getKey(), out); |
| final byte[] k = out.toByteArray(); |
| out.reset(); |
| |
| valueSerializer.serialize(entry.getValue(), out); |
| final byte[] newVal = out.toByteArray(); |
| |
| // the revision of the cache entry holds the value of the key from a previous fetch |
| final byte[] prevVal = entry.getRevision().orElse(null); |
| |
| boolean replaced = false; |
| |
| // start a watch on the key and retrieve the current value |
| redisConnection.watch(k); |
| final byte[] currValue = redisConnection.get(k); |
| |
| // start a transaction |
| redisConnection.multi(); |
| |
| // compare-and-set |
| if (Arrays.equals(prevVal, currValue)) { |
| // if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true, |
| // status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded |
| redisConnection.getSet(k, newVal); |
| |
| // set the TTL if specified |
| if (ttl != -1L) { |
| redisConnection.expire(k, ttl); |
| } |
| } |
| |
| // execute the transaction |
| final List<Object> results = redisConnection.exec(); |
| |
| // if we have a result then the replace succeeded |
| if (results != null && results.size() > 0) { |
| replaced = true; |
| } |
| |
| return replaced; |
| }); |
| } |
| |
| // ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------ |
| |
| private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { |
| final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| |
| keySerializer.serialize(key, out); |
| final byte[] k = out.toByteArray(); |
| |
| out.reset(); |
| |
| valueSerializer.serialize(value, out); |
| final byte[] v = out.toByteArray(); |
| |
| return new Tuple<>(k, v); |
| } |
| |
| private <K> byte[] serialize(final K key, final Serializer<K> keySerializer) throws IOException { |
| final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| |
| keySerializer.serialize(key, out); |
| return out.toByteArray(); |
| } |
| |
| private <T> T withConnection(final RedisAction<T> action) throws IOException { |
| RedisConnection redisConnection = null; |
| try { |
| redisConnection = redisConnectionPool.getConnection(); |
| return action.execute(redisConnection); |
| } finally { |
| if (redisConnection != null) { |
| try { |
| redisConnection.close(); |
| } catch (Exception e) { |
| getLogger().warn("Error closing connection: " + e.getMessage(), e); |
| } |
| } |
| } |
| } |
| |
| } |