blob: 382d8faec727de4f1538520c628db8345c6fe48c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logsearch.config.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class LogSearchConfigZK implements LogSearchConfig {
private static final Logger LOG = LoggerFactory.getLogger(LogSearchConfigZK.class);
private static final int DEFAULT_SESSION_TIMEOUT = 60000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
private static final int RETRY_INTERVAL_MS = 10000;
private static final String DEFAULT_ZK_ROOT = "/logsearch";
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_connect_string",
description = "ZooKeeper connection string.",
examples = {"localhost1:2181,localhost2:2181/znode"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_acls",
description = "ZooKeeper ACLs for handling configs. (read & write)",
examples = {"world:anyone:r,sasl:solr:cdrwa,sasl:logsearch:cdrwa"},
sources = {"logsearch.properties", "logfeeder.properties"},
defaultValue = "world:anyone:cdrwa"
)
private static final String ZK_ACLS_PROPERTY = "logsearch.config.zk_acls";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_root",
description = "ZooKeeper root node where the shippers are stored. (added to the connection string)",
examples = {"/logsearch"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_session_time_out_ms",
description = "ZooKeeper session timeout in milliseconds",
examples = {"60000"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_SESSION_TIMEOUT_PROPERTY = "logsearch.config.zk_session_time_out_ms";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_connection_time_out_ms",
description = "ZooKeeper connection timeout in milliseconds",
examples = {"30000"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_CONNECTION_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_time_out_ms";
@LogSearchPropertyDescription(
name = "logsearch.config.zk_connection_retry_time_out_ms",
description = "The maximum elapsed time for connecting to ZooKeeper in milliseconds. 0 means retrying forever.",
examples = {"1200000"},
sources = {"logsearch.properties", "logfeeder.properties"}
)
private static final String ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY = "logsearch.config.zk_connection_retry_time_out_ms";
protected Map<String, String> properties;
protected CuratorFramework client;
protected TreeCache outputCache;
protected Gson gson;
public void init(Map<String, String> properties) throws Exception {
this.properties = properties;
String root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY) + root);
client = CuratorFrameworkFactory.builder()
.connectString(properties.get(ZK_CONNECT_STRING_PROPERTY) + root)
.retryPolicy(getRetryPolicy(properties.get(ZK_CONNECTION_RETRY_TIMEOUT_PROPERTY)))
.connectionTimeoutMs(getIntProperty(ZK_CONNECTION_TIMEOUT_PROPERTY, DEFAULT_CONNECTION_TIMEOUT))
.sessionTimeoutMs(getIntProperty(ZK_SESSION_TIMEOUT_PROPERTY, DEFAULT_SESSION_TIMEOUT))
.build();
client.start();
outputCache = new TreeCache(client, "/output");
outputCache.start();
gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
}
private int getIntProperty(String propertyKey, int defaultValue) {
if (properties.get(propertyKey) == null)
return defaultValue;
return Integer.parseInt(properties.get(propertyKey));
}
private RetryPolicy getRetryPolicy(String zkConnectionRetryTimeoutValue) {
if (zkConnectionRetryTimeoutValue == null)
return new RetryForever(RETRY_INTERVAL_MS);
int maxElapsedTimeMs = Integer.parseInt(zkConnectionRetryTimeoutValue);
if (maxElapsedTimeMs == 0)
return new RetryForever(RETRY_INTERVAL_MS);
return new RetryUntilElapsed(maxElapsedTimeMs, RETRY_INTERVAL_MS);
}
@Override
public void createInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
try {
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
LOG.info("Uploaded input config for the service " + serviceName + " for cluster " + clusterName);
} catch (NodeExistsException e) {
LOG.debug("Did not upload input config for service " + serviceName + " as it was already uploaded by another Log Feeder");
}
}
@Override
public void createLogLevelFilter(String clusterName, String logId, LogLevelFilter filter) throws Exception {
String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, logId);
String logLevelFilterJson = gson.toJson(filter);
try {
client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, logLevelFilterJson.getBytes());
LOG.info("Uploaded log level filter for the log " + logId + " for cluster " + clusterName);
} catch (NodeExistsException e) {
LOG.debug("Did not upload log level filters for log " + logId + " as it was already uploaded by another Log Feeder");
}
}
protected List<ACL> getAcls() {
String aclStr = properties.get(ZK_ACLS_PROPERTY);
if (StringUtils.isBlank(aclStr)) {
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
List<ACL> acls = new ArrayList<>();
List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
for (String unparcedAcl : aclStrList) {
String[] parts = unparcedAcl.split(":");
if (parts.length == 3) {
acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
}
}
return acls;
}
private Integer parsePermission(String permission) {
int permissionCode = 0;
for (char each : permission.toLowerCase().toCharArray()) {
switch (each) {
case 'r':
permissionCode |= ZooDefs.Perms.READ;
break;
case 'w':
permissionCode |= ZooDefs.Perms.WRITE;
break;
case 'c':
permissionCode |= ZooDefs.Perms.CREATE;
break;
case 'd':
permissionCode |= ZooDefs.Perms.DELETE;
break;
case 'a':
permissionCode |= ZooDefs.Perms.ADMIN;
break;
default:
throw new IllegalArgumentException("Unsupported permission: " + permission);
}
}
return permissionCode;
}
@Override
public void close() {
LOG.info("Closing ZooKeeper Connection");
client.close();
}
}