| /* |
| * |
| * * Licensed to the Apache Software Foundation (ASF) under one or more |
| * * contributor license agreements. See the NOTICE file distributed with |
| * * this work for additional information regarding copyright ownership. |
| * * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * * (the "License"); you may not use this file except in compliance with |
| * * the License. You may obtain a copy of the License at |
| * * |
| * * http://www.apache.org/licenses/LICENSE-2.0 |
| * * |
| * * Unless required by applicable law or agreed to in writing, software |
| * * distributed under the License is distributed on an "AS IS" BASIS, |
| * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * * See the License for the specific language governing permissions and |
| * * limitations under the License. |
| * |
| */ |
| package org.apache.eagle.security.auditlog; |
| |
| import backtype.storm.task.OutputCollector; |
| import com.typesafe.config.Config; |
| import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity; |
| import org.apache.eagle.service.client.EagleServiceConnector; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.wso2.siddhi.core.ExecutionPlanRuntime; |
| import org.wso2.siddhi.core.SiddhiManager; |
| import org.wso2.siddhi.core.event.Event; |
| import org.wso2.siddhi.core.query.output.callback.QueryCallback; |
| import org.wso2.siddhi.core.stream.input.InputHandler; |
| |
| import java.util.*; |
| |
| public class HdfsUserCommandReassembler { |
| private static final Logger LOG = LoggerFactory.getLogger(HdfsUserCommandReassembler.class); |
| private Config config; |
| private InputHandler inputHandler; |
| /** |
| * event schema is attribute name/type pairs |
| */ |
| private final String streamName = "eventStream"; |
| public final static SortedMap<String, String> eventSchema = new TreeMap<String, String>(){{ |
| put("timestamp", AttributeType.LONG.name()); |
| put("src", AttributeType.STRING.name()); |
| put("dst", AttributeType.STRING.name()); |
| put("host", AttributeType.STRING.name()); |
| put("allowed", AttributeType.STRING.name()); |
| put("user", AttributeType.STRING.name()); |
| put("cmd", AttributeType.STRING.name()); |
| }}; |
| |
| public void prepareConfig(Config config) { |
| this.config = config; |
| } |
| |
| private static class GenericQueryCallback extends QueryCallback{ |
| private Map<String, String> outputSelector; |
| private Map<String, String> outputModifier; |
| public GenericQueryCallback(Map<String, String> outputSelector, Map<String, String> outputModifier){ |
| this.outputSelector = outputSelector; |
| this.outputModifier = outputModifier; |
| } |
| @Override |
| public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { |
| Object[] attrValues = inEvents[0].getData(); |
| OutputCollector collector = (OutputCollector) attrValues[0]; |
| SortedMap<String, Object> outputEvent = new TreeMap<String, Object>(); |
| int i = 1; // output is from second element |
| String user = null; |
| for(String attrKey : outputSelector.keySet()){ |
| Object v = attrValues[i++]; |
| outputEvent.put(attrKey, v); |
| if(attrKey.equals("user")) |
| user = (String)v; |
| } |
| |
| outputEvent.putAll(outputModifier); |
| LOG.debug("outputEvent: " + outputEvent); |
| collector.emit(Arrays.asList(user, outputEvent)); |
| } |
| } |
| |
| public String convertToStreamDef(String streamName, Map<String, String> eventSchema){ |
| StringBuilder sb = new StringBuilder(); |
| sb.append("context" + " object,"); |
| for(Map.Entry<String, String> entry : eventSchema.entrySet()){ |
| appendAttributeNameType(sb, entry.getKey(), entry.getValue()); |
| } |
| if(sb.length() > 0){ |
| sb.deleteCharAt(sb.length()-1); |
| } |
| |
| String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");"; |
| return String.format(siddhiStreamDefFormat, sb.toString()); |
| } |
| |
| private void appendAttributeNameType(StringBuilder sb, String attrName, String attrType){ |
| sb.append(attrName); |
| sb.append(" "); |
| if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){ |
| sb.append("string"); |
| }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){ |
| sb.append("int"); |
| }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){ |
| sb.append("long"); |
| }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){ |
| sb.append("bool"); |
| }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){ |
| sb.append("float"); |
| }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){ |
| sb.append("double"); |
| }else{ |
| LOG.warn("AttrType is not recognized, ignore : " + attrType); |
| } |
| sb.append(","); |
| } |
| |
| public void init() { |
| String streamDef = convertToStreamDef(streamName, eventSchema); |
| SiddhiManager siddhiManager = new SiddhiManager(); |
| StringBuilder sb = new StringBuilder(); |
| sb.append(streamDef); |
| String readFrom = null; |
| try { |
| readFrom = config.getString("eagleProps.readHdfsUserCommandPatternFrom"); |
| }catch(Exception ex){ |
| LOG.warn("no config for readHdfsUserCommandPatternFrom", ex); |
| readFrom = "file"; |
| } |
| List<HdfsUserCommandPatternEntity> list = null; |
| try { |
| if (readFrom.equals("file")) { |
| list = new HdfsUserCommandPatternByFileImpl().findAllPatterns(); |
| } else { |
| list = new HdfsUserCommandPatternByDBImpl(new EagleServiceConnector(config)).findAllPatterns(); |
| } |
| }catch(Exception ex){ |
| LOG.error("fail reading hfdsUserCommandPattern", ex); |
| throw new IllegalStateException(ex); |
| } |
| for(HdfsUserCommandPatternEntity rule : list){ |
| sb.append(String.format("@info(name = '%s') from ", rule.getTags().get("userCommand"))); |
| sb.append(rule.getPattern()); |
| sb.append(" select a.context, "); |
| for(Map.Entry<String, String> entry : rule.getFieldSelector().entrySet()){ |
| sb.append(entry.getValue()); |
| sb.append(" as "); |
| sb.append(entry.getKey()); |
| sb.append(", "); |
| } |
| sb.deleteCharAt(sb.lastIndexOf(",")); |
| sb.append("insert into "); |
| sb.append(rule.getTags().get("userCommand")); |
| sb.append("_outputStream;"); |
| } |
| |
| LOG.info("patterns: " + sb.toString()); |
| ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(sb.toString()); |
| |
| for(HdfsUserCommandPatternEntity rule : list){ |
| executionPlanRuntime.addCallback(rule.getTags().get("userCommand"), new GenericQueryCallback(rule.getFieldSelector(), rule.getFieldModifier())); |
| } |
| |
| inputHandler = executionPlanRuntime.getInputHandler(streamName); |
| executionPlanRuntime.start(); |
| } |
| |
| public void flatMap(List<Object> input, OutputCollector collector) { |
| if(LOG.isDebugEnabled()) LOG.debug("incoming event:" + input.get(1)); |
| SortedMap<String, Object> toBeCopied = (SortedMap<String, Object>) input.get(1); |
| SortedMap<String, Object> event = new TreeMap<>(toBeCopied); |
| Object[] siddhiEvent = convertToSiddhiEvent(collector, event); |
| try { |
| inputHandler.send(siddhiEvent); |
| } catch (Exception ex){ |
| LOG.error("Fail sending event to Siddhi pattern engine", ex); |
| throw new IllegalStateException(ex); |
| } |
| } |
| |
| public Object[] convertToSiddhiEvent(Object context, SortedMap<String, Object> event){ |
| Object[] siddhiEvent = new Object[1+event.size()]; |
| siddhiEvent[0] = context; // context |
| int i = 1; |
| for(Object value : event.values()){ |
| siddhiEvent[i++] = value; |
| } |
| return siddhiEvent; |
| } |
| } |