blob: b26181dd7f9f8a81a4204f207b5aa94a183e9739 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logsearch.config.zookeeper;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Utility functions for handling ZK operation and monitor ZK data for Log Search configuration
*/
public class LogSearchConfigZKHelper {
private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZKHelper.class);
private static final int DEFAULT_SESSION_TIMEOUT = 60000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
private static final int RETRY_INTERVAL_MS = 10000;
private static final String DEFAULT_ZK_ROOT = "/logsearch";
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_connect_string",
description = "ZooKeeper connection string.",
examples = {"localhost1:2181,localhost2:2181/znode"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_acls",
description = "ZooKeeper ACLs for handling configs. (read & write)",
examples = {"world:anyone:r,sasl:solr:cdrwa,sasl:logsearch:cdrwa"},
sources = {"logsearch.properties", "logfeeder.properties"},
defaultValue = "world:anyone:cdrwa"
)
private static final String ZK_ACLS_PROPERTY = "logsearch.config.zk_acls";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_root",
description = "ZooKeeper root node where the shippers are stored. (added to the connection string)",
examples = {"/logsearch"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_session_time_out_ms",
description = "ZooKeeper session timeout in milliseconds",
examples = {"60000"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_SESSION_TIMEOUT_PROPERTY = "logsearch.config.zk_session_time_out_ms";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_connection_time_out_ms",
description = "ZooKeeper connection timeout in milliseconds",
examples = {"30000"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_CONNECTION_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_time_out_ms";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_connection_retry_time_out_ms",
description = "The maximum elapsed time for connecting to ZooKeeper in milliseconds. 0 means retrying forever.",
examples = {"1200000"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_retry_time_out_ms";
private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
private LogSearchConfigZKHelper() {
}
/**
* Create ZK curator client from a configuration (map holds the configs for that)
*/
public static CuratorFramework createZKClient(Map<String, String> properties) {
String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root);
return CuratorFrameworkFactory.builder()
.connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
.retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
.connectionTimeoutMs(getIntProperty(properties, ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
.sessionTimeoutMs(getIntProperty(properties, ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
.build();
}
/**
* Get ACLs from a property (get the value then parse and transform it as ACL objects)
*/
public static List<ACL> getAcls(Map<String, String> properties) {
String aclStr = properties.get(ZK_ACLS_PROPERTY);
if (StringUtils.isBlank(aclStr)) {
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
List<ACL> acls = new ArrayList<>();
List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
for (String unparcedAcl : aclStrList) {
String[] parts = unparcedAcl.split(":");
if (parts.length == 3) {
acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
}
}
return acls;
}
private static int getIntProperty(Map<String, String> properties, String propertyKey, int defaultValue) {
if (properties.get(propertyKey) == null)
return defaultValue;
return Integer.parseInt(properties.get(propertyKey));
}
private static RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
if (zkConnectionRetryTimeoutValue == null)
return new RetryForever(RETRY_INTERVAL_MS);
int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
if (maxElapsedTimeMs == 0)
return new RetryForever(RETRY_INTERVAL_MS);
return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
}
/**
* Create listener for znode of log level filters - can be used for Log Feeder as it can be useful if it's monitoring the log level changes
*/
public static TreeCacheListener createTreeCacheListener(String clusterName, Gson gson, LogLevelFilterMonitor logLevelFilterMonitor) {
return new TreeCacheListener() {
private final Set<TreeCacheEvent.Type> nodeEvents = ImmutableSet.of(TreeCacheEvent.Type.NODE_ADDED, TreeCacheEvent.Type.NODE_UPDATED, TreeCacheEvent.Type.NODE_REMOVED);
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (!nodeEvents.contains(event.getType())) {
return;
}
String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
String nodeData = new String(event.getData().getData());
TreeCacheEvent.Type eventType = event.getType();
String configPathStab = String.format("/%s/", clusterName);
if (event.getData().getPath().startsWith(configPathStab + "loglevelfilter/")) {
handleLogLevelFilterChange(eventType, nodeName, nodeData, gson, logLevelFilterMonitor);
}
}
};
}
/**
* Create root + cluster name znode cache
*/
public static TreeCache createClusterCache(CuratorFramework client, String clusterName) {
return new TreeCache(client, String.format("/%s", clusterName));
}
/**
* Assign listener to cluster cache and start to use that listener
*/
public static void addAndStartListenersOnCluster(TreeCache clusterCache, TreeCacheListener listener) throws Exception {
clusterCache.getListenable().addListener(listener);
clusterCache.start();
}
public static void waitUntilRootAvailable(CuratorFramework client) throws Exception {
while (client.checkExists().forPath("/") == null) {
LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
}
}
/**
* Call log level filter monitor interface to handle node related operations (on update/remove)
*/
public static void handleLogLevelFilterChange(final TreeCacheEvent.Type eventType, final String nodeName, final String nodeData,
final Gson gson, final LogLevelFilterMonitor logLevelFilterMonitor) {
switch (eventType) {
case NODE_ADDED:
case NODE_UPDATED:
LOG.info("Node added/updated under loglevelfilter ZK node: " + nodeName);
LogLevelFilter logLevelFilter = gson.fromJson(nodeData, LogLevelFilter.class);
logLevelFilterMonitor.setLogLevelFilter(nodeName, logLevelFilter);
break;
case NODE_REMOVED:
LOG.info("Node removed loglevelfilter input ZK node: " + nodeName);
logLevelFilterMonitor.removeLogLevelFilter(nodeName);
break;
default:
break;
}
}
/**
* Pares ZK ACL permission string and transform it to an integer
*/
public static Integer parsePermission(String permission) {
int permissionCode = 0;
for (char each : permission.toLowerCase().toCharArray()) {
switch (each) {
case 'r':
permissionCode |= ZooDefs.Perms.READ;
break;
case 'w':
permissionCode |= ZooDefs.Perms.WRITE;
break;
case 'c':
permissionCode |= ZooDefs.Perms.CREATE;
break;
case 'd':
permissionCode |= ZooDefs.Perms.DELETE;
break;
case 'a':
permissionCode |= ZooDefs.Perms.ADMIN;
break;
default:
throw new IllegalArgumentException("Unsupported permission: " + permission);
}
}
return permissionCode;
}
public static Gson createGson() {
return new GsonBuilder().setDateFormat(DATE_FORMAT).create();
}
}