blob: 3f6aec8413c384bcbf30e21c2bf1c6437fb59499 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logsearch.config.local;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.json.JsonHelper;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
/**
* Watch specific config folder, and check against input.config-*.json file changes (create/update/remove),
* a change can trigger an input config monitor (which should start to monitor input files with the new or updated settings)
*/
public class LogSearchConfigLocalUpdater implements Runnable {
private static final Logger logger = LogManager.getLogger(LogSearchConfigLocalUpdater.class);
private final Path path;
private final WatchService watchService;
private final InputConfigMonitor inputConfigMonitor;
private final Map<String, String> inputFileContentsMap;
private final JsonParser parser;
private final JsonArray globalConfigNode;
private final Pattern serviceNamePattern;
public LogSearchConfigLocalUpdater(final Path path, final WatchService watchService,
final InputConfigMonitor inputConfigMonitor, final Map<String, String> inputFileContentsMap,
final JsonParser parser, final JsonArray globalConfigNode, final Pattern serviceNamePattern) {
this.path = path;
this.watchService = watchService;
this.inputConfigMonitor = inputConfigMonitor;
this.inputFileContentsMap = inputFileContentsMap;
this.parser = parser;
this.globalConfigNode = globalConfigNode;
this.serviceNamePattern = serviceNamePattern;
}
@Override
public void run() {
final Map<WatchKey, Path> keys = new ConcurrentHashMap<>();
try {
register(this.path, keys, watchService);
} catch (IOException e) {
logger.error("{}", e);
throw new RuntimeException(e);
}
while (!Thread.interrupted()) {
WatchKey key;
try {
key = watchService.poll(10, TimeUnit.SECONDS);
} catch (InterruptedException | ClosedWatchServiceException e) {
break;
}
if (key != null) {
Path path = keys.get(key);
for (WatchEvent<?> ev : key.pollEvents()) {
WatchEvent<Path> event = cast(ev);
WatchEvent.Kind<Path> kind = event.kind();
Path name = event.context();
Path monitoredInput = path.resolve(name);
File file = monitoredInput.toFile();
String absPath = monitoredInput.toAbsolutePath().toString();
if (file.getName().startsWith("input.config-") && file.getName().endsWith(".json")) {
Matcher m = serviceNamePattern.matcher(file.getName());
m.find();
String serviceName = m.group(1);
try {
if (kind == ENTRY_CREATE) {
logger.info("New input config entry found: {}", absPath);
String inputConfig = new String(Files.readAllBytes(monitoredInput));
JsonElement inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode);
inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
inputFileContentsMap.put(absPath, inputConfig);
} else if (kind == ENTRY_MODIFY) {
logger.info("Input config entry modified: {}", absPath);
if (inputFileContentsMap.containsKey(absPath)) {
String oldContent = inputFileContentsMap.get(absPath);
String inputConfig = new String(Files.readAllBytes(monitoredInput));
if (!inputConfig.equals(oldContent)) {
inputConfigMonitor.removeInputs(serviceName);
inputFileContentsMap.remove(absPath);
JsonElement inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode);
inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
inputFileContentsMap.put(absPath, inputConfig);
}
}
} else if (kind == ENTRY_DELETE) {
logger.info("Input config deleted: {}", absPath);
if (inputFileContentsMap.containsKey(absPath)) {
inputConfigMonitor.removeInputs(serviceName);
inputFileContentsMap.remove(absPath);
}
}
} catch (Exception e) {
logger.error("{}", e);
break;
}
}
}
if (!key.reset()) {
logger.info("{} is invalid", key);
keys.remove(key);
if (keys.isEmpty()) {
break;
}
}
}
}
}
private void register(Path dir, Map<WatchKey, Path> keys, WatchService watchService)
throws IOException {
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE,
ENTRY_MODIFY);
keys.put(key, dir);
}
@SuppressWarnings("unchecked")
private <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
}