| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.standard; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.distributed.cache.client.Deserializer; |
| import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; |
| import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; |
| import org.apache.nifi.distributed.cache.client.Serializer; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.reporting.InitializationException; |
| import org.apache.nifi.state.MockStateManager; |
| import org.apache.nifi.util.MockControllerServiceInitializationContext; |
| import org.apache.nifi.util.MockComponentLog; |
| import org.apache.nifi.util.TestRunner; |
| import org.apache.nifi.util.TestRunners; |
| import org.junit.Test; |
| |
| public class TestDetectDuplicate { |
| |
| static { |
| System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); |
| System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug"); |
| System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug"); |
| } |
| |
| @Test |
| public void testDuplicate() throws InitializationException { |
| final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); |
| final DistributedMapCacheClientImpl client = createClient(); |
| final Map<String, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); |
| runner.addControllerService("client", client, clientProperties); |
| runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); |
| runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); |
| runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours"); |
| final Map<String, String> props = new HashMap<>(); |
| props.put("hash.value", "1000"); |
| runner.enqueue(new byte[]{}, props); |
| runner.enableControllerService(client); |
| |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| runner.clearTransferState(); |
| runner.enqueue(new byte[]{}, props); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); |
| runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0); |
| runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); |
| } |
| |
| @Test |
| public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException { |
| |
| final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); |
| final DistributedMapCacheClientImpl client = createClient(); |
| final Map<String, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); |
| runner.addControllerService("client", client, clientProperties); |
| runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); |
| runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); |
| runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs"); |
| runner.enableControllerService(client); |
| |
| final Map<String, String> props = new HashMap<>(); |
| props.put("hash.value", "1000"); |
| runner.enqueue(new byte[]{}, props); |
| |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| runner.clearTransferState(); |
| Thread.sleep(3000); |
| runner.enqueue(new byte[]{}, props); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); |
| runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); |
| } |
| |
| private DistributedMapCacheClientImpl createClient() throws InitializationException { |
| |
| final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); |
| final ComponentLog logger = new MockComponentLog("client", client); |
| final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client)); |
| client.initialize(clientInitContext); |
| |
| return client; |
| } |
| |
| @Test |
| public void testDuplicateNoCache() throws InitializationException { |
| final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); |
| final DistributedMapCacheClientImpl client = createClient(); |
| final Map<String, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); |
| runner.addControllerService("client", client, clientProperties); |
| runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); |
| runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); |
| runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours"); |
| runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false"); |
| final Map<String, String> props = new HashMap<>(); |
| props.put("hash.value", "1000"); |
| runner.enqueue(new byte[]{}, props); |
| runner.enableControllerService(client); |
| |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| runner.clearTransferState(); |
| |
| runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "true"); |
| runner.enqueue(new byte[]{}, props); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); |
| runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); |
| runner.clearTransferState(); |
| |
| runner.enqueue(new byte[]{}, props); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); |
| runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0); |
| runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); |
| } |
| |
| @Test |
| public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException { |
| |
| final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); |
| final DistributedMapCacheClientImpl client = createClient(); |
| final Map<String, String> clientProperties = new HashMap<>(); |
| clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); |
| runner.addControllerService("client", client, clientProperties); |
| runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); |
| runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); |
| runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs"); |
| runner.enableControllerService(client); |
| |
| final Map<String, String> props = new HashMap<>(); |
| props.put("hash.value", "1000"); |
| runner.enqueue(new byte[]{}, props); |
| |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| |
| runner.clearTransferState(); |
| Thread.sleep(3000); |
| |
| runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false"); |
| runner.enqueue(new byte[]{}, props); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); |
| runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); |
| runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); |
| } |
| |
| static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { |
| |
| boolean exists = false; |
| private Object cacheValue; |
| |
| @Override |
| public void close() throws IOException { |
| } |
| |
| @Override |
| public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { |
| } |
| |
| @Override |
| protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| final List<PropertyDescriptor> props = new ArrayList<>(); |
| props.add(DistributedMapCacheClientService.HOSTNAME); |
| props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT); |
| props.add(DistributedMapCacheClientService.PORT); |
| props.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE); |
| return props; |
| } |
| |
| @Override |
| public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { |
| if (exists) { |
| return false; |
| } |
| |
| cacheValue = value; |
| exists = true; |
| return true; |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, |
| final Deserializer<V> valueDeserializer) throws IOException { |
| if (exists) { |
| return (V) cacheValue; |
| } |
| cacheValue = value; |
| exists = true; |
| return null; |
| } |
| |
| @Override |
| public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { |
| return exists; |
| } |
| |
| @Override |
| public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { |
| if (exists) { |
| return (V) cacheValue; |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { |
| exists = false; |
| return true; |
| } |
| |
| @Override |
| public long removeByPattern(String regex) throws IOException { |
| if (exists) { |
| exists = false; |
| return 1L; |
| } else { |
| return 0L; |
| } |
| } |
| |
| @Override |
| public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { |
| cacheValue = value; |
| exists = true; |
| } |
| } |
| } |