blob: a90d1235b0ea8ba083be768de6f60001d58fbd5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.rest.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorIndexingConfigService;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;
@Service
public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigService {
private ObjectMapper objectMapper;
private CuratorFramework client;
private ConfigurationsCache cache;
@Autowired
public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
this.objectMapper = objectMapper;
this.client = client;
this.cache = cache;
}
@Override
public Map<String, Object> save(String name, Map<String, Object> sensorIndexingConfig) throws RestException {
try {
ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(name, objectMapper.writeValueAsString(sensorIndexingConfig).getBytes(), client);
} catch (Exception e) {
throw new RestException(e);
}
return sensorIndexingConfig;
}
@Override
public Map<String, Object> findOne(String name) throws RestException {
IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
return configs.getSensorIndexingConfig(name, false);
}
@Override
public Map<String, Map<String, Object>> getAll() throws RestException {
Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>();
List<String> sensorNames = getAllTypes();
for (String name : sensorNames) {
Map<String, Object> config = findOne(name);
if(config != null) {
sensorIndexingConfigs.put(name, config);
}
}
return sensorIndexingConfigs;
}
@Override
public List<String> getAllTypes() throws RestException {
IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
return configs.getTypes();
}
/**
* Get a list of index names for a given writer (e.g. elasticsearch, solr, hdfs).
* This functions in the following way:
* * If an index config exists, then the index name will be returned. If unspecified, then the sensor name is used
* * If a parser exists and an index does NOT exist, then it will be included.
* * If the writer is disabled in the index config, then it will NOT be included.
* @param writerName The writer name to use
* @return An iterable of index names
* @throws RestException
*/
@Override
public Iterable<String> getAllIndices(String writerName) throws RestException {
if(StringUtils.isEmpty(writerName)) {
return Collections.emptyList();
}
IndexingConfigurations indexingConfigs = cache.get( IndexingConfigurations.class);
ParserConfigurations parserConfigs = cache.get( ParserConfigurations.class);
Set<String> ret = new HashSet<>();
for(String sensorName : Iterables.concat(parserConfigs.getTypes(), indexingConfigs.getTypes())) {
if(indexingConfigs.isEnabled(sensorName, writerName)) {
String indexName = indexingConfigs.getIndex(sensorName, writerName);
ret.add(indexName == null ? sensorName : indexName);
}
}
return ret;
}
@Override
public boolean delete(String name) throws RestException {
try {
client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/" + name);
} catch (KeeperException.NoNodeException e) {
return false;
} catch (Exception e) {
throw new RestException(e);
}
return true;
}
}