| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.dataloads.extractor; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.log4j.Logger; |
| import org.apache.metron.common.configuration.ConfigurationsUtils; |
| import org.apache.metron.common.dsl.Context; |
| import org.apache.metron.common.dsl.MapVariableResolver; |
| import org.apache.metron.common.dsl.StellarFunctions; |
| import org.apache.metron.common.stellar.StellarPredicateProcessor; |
| import org.apache.metron.common.stellar.StellarProcessor; |
| import org.apache.metron.common.utils.ConversionUtils; |
| import org.apache.metron.common.utils.JSONUtils; |
| import org.apache.metron.enrichment.lookup.LookupKV; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.util.*; |
| |
| import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*; |
| |
| public class TransformFilterExtractorDecorator extends ExtractorDecorator { |
| private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class); |
| |
| protected enum ExtractorOptions { |
| VALUE_TRANSFORM("value_transform"), |
| VALUE_FILTER("value_filter"), |
| INDICATOR_TRANSFORM("indicator_transform"), |
| INDICATOR_FILTER("indicator_filter"), |
| ZK_QUORUM("zk_quorum"), |
| INDICATOR("indicator"); |
| |
| private String key; |
| |
| ExtractorOptions(String key) { |
| this.key = key; |
| } |
| |
| @Override |
| public String toString() { |
| return key; |
| } |
| |
| public boolean existsIn(Map<String, Object> config) { |
| return config.containsKey(key); |
| } |
| } |
| |
| private Optional<CuratorFramework> zkClient; |
| private Map<String, String> valueTransforms; |
| private Map<String, String> indicatorTransforms; |
| private String valueFilter; |
| private String indicatorFilter; |
| private Context stellarContext; |
| private StellarProcessor transformProcessor; |
| private StellarPredicateProcessor filterProcessor; |
| private Map<String, Object> globalConfig; |
| |
| public TransformFilterExtractorDecorator(Extractor decoratedExtractor) { |
| super(decoratedExtractor); |
| this.zkClient = Optional.empty(); |
| this.valueTransforms = new LinkedHashMap<>(); |
| this.indicatorTransforms = new LinkedHashMap<>(); |
| this.valueFilter = ""; |
| this.indicatorFilter = ""; |
| this.transformProcessor = new StellarProcessor(); |
| this.filterProcessor = new StellarPredicateProcessor(); |
| } |
| |
| @Override |
| public void initialize(Map<String, Object> config) { |
| super.initialize(config); |
| if (VALUE_TRANSFORM.existsIn(config)) { |
| this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString()); |
| } |
| if (INDICATOR_TRANSFORM.existsIn(config)) { |
| this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString()); |
| } |
| if (VALUE_FILTER.existsIn(config)) { |
| this.valueFilter = getFilter(config, VALUE_FILTER.toString()); |
| } |
| if (INDICATOR_FILTER.existsIn(config)) { |
| this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString()); |
| } |
| String zkClientUrl = ""; |
| if (ZK_QUORUM.existsIn(config)) { |
| zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class); |
| } |
| zkClient = setupClient(zkClient, zkClientUrl); |
| this.globalConfig = getGlobalConfig(zkClient); |
| this.stellarContext = createContext(zkClient); |
| StellarFunctions.initialize(stellarContext); |
| this.transformProcessor = new StellarProcessor(); |
| this.filterProcessor = new StellarPredicateProcessor(); |
| } |
| |
| private String getFilter(Map<String, Object> config, String valueFilter) { |
| return (String) config.get(valueFilter); |
| } |
| |
| /** |
| * Get a map of the transformations from the config of the specified type |
| * @param config main config map |
| * @param type the transformation type to get from config |
| * @return map of transformations. |
| */ |
| private Map<String, String> getTransforms(Map<String, Object> config, String type) { |
| // If this isn't a Map of Strings, let an exception be thrown |
| @SuppressWarnings("unchecked") Map<Object, Object> transformsConfig = (Map) config.get(type); |
| Map<String, String> transforms = new LinkedHashMap<>(); |
| for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) { |
| transforms.put((String) e.getKey(), (String) e.getValue()); |
| } |
| return transforms; |
| } |
| |
| /** |
| * Creates a Zookeeper client if it doesn't exist and a url for zk is provided. |
| * @param zookeeperUrl The Zookeeper URL. |
| */ |
| private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) { |
| // can only create client if we have a valid zookeeper URL |
| if (!zkClient.isPresent()) { |
| if (StringUtils.isNotBlank(zookeeperUrl)) { |
| CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl); |
| client.start(); |
| return Optional.of(client); |
| } else { |
| LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**"); |
| return Optional.empty(); |
| } |
| } else { |
| return zkClient; |
| } |
| } |
| |
| private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) { |
| if (zkClient.isPresent()) { |
| try { |
| return JSONUtils.INSTANCE.load( |
| new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())), |
| new TypeReference<Map<String, Object>>() { |
| }); |
| } catch (Exception e) { |
| LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e); |
| } |
| } |
| return new LinkedHashMap<>(); |
| } |
| |
| private Context createContext(Optional<CuratorFramework> zkClient) { |
| Context.Builder builder = new Context.Builder(); |
| if (zkClient.isPresent()) { |
| builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get); |
| } |
| builder.with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig); |
| return builder.build(); |
| } |
| |
| @Override |
| public Iterable<LookupKV> extract(String line) throws IOException { |
| List<LookupKV> lkvs = new ArrayList<>(); |
| for (LookupKV lkv : super.extract(line)) { |
| if (updateLookupKV(lkv)) { |
| lkvs.add(lkv); |
| } |
| } |
| return lkvs; |
| } |
| |
| /** |
| * Returns true if lookupkv is not null after transforms and filtering on the value and indicator key |
| * @param lkv LookupKV to transform and filter |
| * @return true if lkv is not null after transform/filter |
| */ |
| private boolean updateLookupKV(LookupKV lkv) { |
| Map<String, Object> ret = lkv.getValue().getMetadata(); |
| Map<String, Object> ind = new LinkedHashMap<>(); |
| String indicator = lkv.getKey().getIndicator(); |
| // add indicator as a resolvable variable. Also enable using resolved/transformed variables and values from operating on the value metadata |
| ind.put(INDICATOR.toString(), indicator); |
| MapVariableResolver resolver = new MapVariableResolver(ret, ind, globalConfig); |
| transform(valueTransforms, ret, resolver); |
| transform(indicatorTransforms, ind, resolver); |
| // update indicator |
| Object updatedIndicator = ind.get(INDICATOR.toString()); |
| if (updatedIndicator != null) { |
| if (!(updatedIndicator instanceof String)) { |
| throw new UnsupportedOperationException("Indicator transform must return String type"); |
| } |
| lkv.getKey().setIndicator((String) updatedIndicator); |
| return filter(indicatorFilter, resolver) && filter(valueFilter, resolver); |
| } else { |
| return false; |
| } |
| } |
| |
| private void transform(Map<String, String> transforms, Map<String, Object> variableMap, MapVariableResolver variableResolver) { |
| for (Map.Entry<String, String> entry : transforms.entrySet()) { |
| Object o = transformProcessor.parse(entry.getValue(), variableResolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext); |
| if (o == null) { |
| variableMap.remove(entry.getKey()); |
| } else { |
| variableMap.put(entry.getKey(), o); |
| } |
| } |
| } |
| |
| private Boolean filter(String filterPredicate, MapVariableResolver variableResolver) { |
| return filterProcessor.parse(filterPredicate, variableResolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext); |
| } |
| |
| protected void setZkClient(Optional<CuratorFramework> zkClient) { |
| this.zkClient = zkClient; |
| } |
| |
| } |