| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.metron.hbase.coprocessor; |
| |
| import com.github.benmanes.caffeine.cache.CacheWriter; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.metron.common.configuration.EnrichmentConfigurations; |
| import org.apache.metron.enrichment.converter.EnrichmentKey; |
| import org.apache.metron.hbase.TableProvider; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.instanceOf; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.mockito.BDDMockito.willAnswer; |
| import static org.mockito.Mockito.*; |
| |
| public class EnrichmentCoprocessorTest { |
| |
| @Mock |
| private CacheWriter<String, String> cacheWriter; |
| @Mock |
| private RegionCoprocessorEnvironment copEnv; |
| @Mock |
| private ObserverContext<RegionCoprocessorEnvironment> observerContext; |
| private EnrichmentCoprocessor cop; |
| @Mock |
| private GlobalConfigService globalConfigService; |
| private Configuration config; |
| private static boolean instantiatedCustomTableProvider; |
| |
| @BeforeEach |
| public void setup() { |
| MockitoAnnotations.initMocks(this); |
| cop = new EnrichmentCoprocessor(cacheWriter, globalConfigService); |
| config = HBaseConfiguration.create(); |
| config.set(EnrichmentCoprocessor.ZOOKEEPER_URL, "foobar"); |
| when(copEnv.getConfiguration()).thenReturn(config); |
| instantiatedCustomTableProvider = false; |
| } |
| |
| @Test |
| public void cache_writes_only_on_first_cache_miss() throws Exception { |
| cop.start(copEnv); |
| String[] enrichTypes = new String[]{"foo", "bar", "baz", "metron"}; |
| final int putsPerType = 3; |
| Map<String, List<Put>> putsByType = simulateMultiplePutsPerType(putsPerType, enrichTypes); |
| int totalPuts = 0; |
| for (Map.Entry<String, List<Put>> entry : putsByType.entrySet()) { |
| String type = entry.getKey(); |
| List<Put> puts = entry.getValue(); |
| for (Put put : puts) { |
| cop.postPut(observerContext, put, null, null); |
| verify(cacheWriter, times(1)).write(eq(type), eq("{}")); |
| totalPuts++; |
| } |
| } |
| assertThat(totalPuts, equalTo(enrichTypes.length * putsPerType)); |
| } |
| |
| /** |
| * Generate a list of 'count' puts for each type in 'types'. |
| * |
| * @param count Number of puts to create per type |
| * @param types List of types to create the puts for. |
| * @return Map of types to a List of size 'count' puts |
| */ |
| private Map<String, List<Put>> simulateMultiplePutsPerType(int count, String... types) { |
| Map<String, List<Put>> putsByType = new HashMap<>(); |
| for (String type : types) { |
| List<Put> puts = putsByType.getOrDefault(type, new ArrayList<>()); |
| for (int i = 0; i < count; i++) { |
| EnrichmentKey ek = new EnrichmentKey(type, String.valueOf(i)); |
| puts.add(new Put(ek.toBytes())); |
| putsByType.put(type, puts); |
| } |
| } |
| return putsByType; |
| } |
| |
| public static class TestTableProvider implements TableProvider { |
| |
| public TestTableProvider() { |
| instantiatedCustomTableProvider = true; |
| } |
| |
| @Override |
| public Table getTable(Configuration config, String tableName) throws IOException { |
| return null; // not used for instantiation test |
| } |
| |
| } |
| |
| @Test |
| public void creates_tableprovider_from_config_property() throws Exception { |
| cop = new EnrichmentCoprocessor(globalConfigService); |
| Map<String, Object> globalConfig = new HashMap<String, Object>() {{ |
| put(EnrichmentConfigurations.TABLE_PROVIDER, TestTableProvider.class.getName()); |
| }}; |
| when(globalConfigService.get()).thenReturn(globalConfig); |
| cop.start(copEnv); |
| assertThat(instantiatedCustomTableProvider, equalTo(true)); |
| } |
| |
| @Test |
| public void bad_enrichment_key_exceptions_thrown_as_IOException() throws Exception { |
| cop.start(copEnv); |
| IOException e = assertThrows(IOException.class, |
| () -> cop.postPut(observerContext, new Put("foo".getBytes(StandardCharsets.UTF_8)), null, null)); |
| assertEquals("Error occurred while processing enrichment Put.", e.getMessage()); |
| assertThat(e.getCause(), instanceOf(RuntimeException.class)); |
| } |
| |
| @Test |
| public void general_exceptions_thrown_as_IOException() throws Exception { |
| Throwable cause = new Throwable("Bad things happened."); |
| // strictly speaking, this is a checked exception not in the api for CacheWriter, but it allows |
| // us to test catching all Throwable types |
| willAnswer(i -> { |
| throw cause; |
| }).given(cacheWriter).write(any(), any()); |
| cop.start(copEnv); |
| EnrichmentKey ek = new EnrichmentKey("foo", "bar"); |
| IOException e = assertThrows(IOException.class, |
| () -> cop.postPut(observerContext, new Put(ek.toBytes()), null, null)); |
| assertEquals("Error occurred while processing enrichment Put.", e.getMessage()); |
| assertEquals(e.getCause(), cause); |
| } |
| |
| } |