blob: c460e3cb70a2d4bdef973764970b643f0e8a17a7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.rest.service.impl;
import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
import org.apache.metron.rest.service.GrokService;
import org.apache.metron.rest.service.SensorParserConfigService;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
import org.reflections.Reflections;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SensorParserConfigServiceImpl implements SensorParserConfigService {
private ObjectMapper objectMapper;
private CuratorFramework client;
private ConfigurationsCache cache;
private GrokService grokService;
private Map<String, String> availableParsers;
@Autowired
public SensorParserConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client,
GrokService grokService, ConfigurationsCache cache) {
this.objectMapper = objectMapper;
this.client = client;
this.grokService = grokService;
this.cache = cache;
}
@Override
public SensorParserConfig save(String name, SensorParserConfig sensorParserConfig) throws RestException {
try {
ConfigurationsUtils.writeSensorParserConfigToZookeeper(name,
objectMapper.writeValueAsString(sensorParserConfig).getBytes(), client);
} catch (Exception e) {
throw new RestException(e);
}
return sensorParserConfig;
}
@Override
public SensorParserConfig findOne(String name) throws RestException {
ParserConfigurations configs = cache.get( ParserConfigurations.class);
return configs.getSensorParserConfig(name);
}
@Override
public Map<String, SensorParserConfig> getAll() throws RestException {
Map<String, SensorParserConfig> sensorParserConfigs = new HashMap<>();
List<String> sensorNames = getAllTypes();
for (String name : sensorNames) {
SensorParserConfig config = findOne(name);
if(config != null) {
sensorParserConfigs.put(name, config);
}
}
return sensorParserConfigs;
}
@Override
public boolean delete(String name) throws RestException {
try {
client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/" + name);
} catch (KeeperException.NoNodeException e) {
return false;
} catch (Exception e) {
throw new RestException(e);
}
return true;
}
@Override
public List<String> getAllTypes() throws RestException {
ParserConfigurations configs = cache.get( ParserConfigurations.class);
return configs.getTypes();
}
@Override
public Map<String, String> getAvailableParsers() {
if (availableParsers == null) {
availableParsers = new HashMap<>();
Set<Class<? extends MessageParser>> parserClasses = getParserClasses();
parserClasses.forEach(parserClass -> {
if (!"BasicParser".equals(parserClass.getSimpleName())) {
availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""),
parserClass.getName());
}
});
}
return availableParsers;
}
@Override
public Map<String, String> reloadAvailableParsers() {
availableParsers = null;
return getAvailableParsers();
}
private Set<Class<? extends MessageParser>> getParserClasses() {
Reflections reflections = new Reflections("org.apache.metron.parsers");
return reflections.getSubTypesOf(MessageParser.class);
}
@Override
public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws RestException {
SensorParserConfig sensorParserConfig = parseMessageRequest.getSensorParserConfig();
if (sensorParserConfig == null) {
throw new RestException("SensorParserConfig is missing from ParseMessageRequest");
} else if (sensorParserConfig.getParserClassName() == null) {
throw new RestException("SensorParserConfig must have a parserClassName");
} else {
MessageParser<JSONObject> parser;
try {
parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName())
.newInstance();
} catch (Exception e) {
throw new RestException(e.toString(), e.getCause());
}
Path temporaryGrokPath = null;
if (isGrokConfig(sensorParserConfig)) {
String name = parseMessageRequest.getSensorParserConfig().getSensorTopic();
temporaryGrokPath = grokService.saveTemporary(parseMessageRequest.getGrokStatement(), name);
sensorParserConfig.getParserConfig()
.put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString());
}
parser.configure(sensorParserConfig.getParserConfig());
parser.init();
JSONObject results = parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0);
if (isGrokConfig(sensorParserConfig) && temporaryGrokPath != null) {
grokService.deleteTemporary();
}
return results;
}
}
private boolean isGrokConfig(SensorParserConfig sensorParserConfig) {
return GROK_CLASS_NAME.equals(sensorParserConfig.getParserClassName());
}
}