blob: 336abd00cea493de001144855f72f10c6369cf99 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.storm.integration;
import java.nio.charset.StandardCharsets;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.ZKServerComponent;
import java.util.Arrays;
import java.util.Properties;
import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile;
import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile;
import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
/**
* Uploads configuration to Zookeeper.
*/
public class ConfigUploadComponent implements InMemoryComponent {
private Properties topologyProperties;
private String globalConfiguration;
private String profilerConfigurationPath;
private ProfilerConfig profilerConfig;
@Override
public void start() throws UnableToStartException {
try {
upload();
} catch (Exception e) {
throw new UnableToStartException(e.getMessage(), e);
}
}
@Override
public void stop() {
// nothing to do
}
public void update()
throws UnableToStartException {
try {
upload();
} catch (Exception e) {
throw new UnableToStartException(e.getMessage(), e);
}
}
/**
* Uploads configuration to Zookeeper.
* @throws Exception
*/
private void upload() throws Exception {
final String zookeeperUrl = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
try(CuratorFramework client = getClient(zookeeperUrl)) {
if(client.getState() != CuratorFrameworkState.STARTED) {
client.start();
}
uploadGlobalConfig(client);
uploadProfilerConfig(client);
}
}
/**
* Upload the profiler configuration to Zookeeper.
* @param client The zookeeper client.
*/
private void uploadProfilerConfig(CuratorFramework client) throws Exception {
byte[] configBytes = null;
if (profilerConfigurationPath != null) {
configBytes = readProfilerConfigFromFile(profilerConfigurationPath);
} else if(profilerConfig != null) {
configBytes = profilerConfig.toJSON().getBytes(StandardCharsets.UTF_8);
}
if (ArrayUtils.getLength(configBytes) > 0) {
writeProfilerConfigToZookeeper(configBytes, client);
}
}
/**
* Upload the global configuration to Zookeeper.
* @param client The zookeeper client.
*/
private void uploadGlobalConfig(CuratorFramework client) throws Exception {
if (globalConfiguration != null) {
byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration);
if (globalConfig.length > 0) {
writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), client);
}
}
}
public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
this.topologyProperties = topologyProperties;
return this;
}
public ConfigUploadComponent withGlobalConfiguration(String path) {
this.globalConfiguration = path;
return this;
}
public ConfigUploadComponent withProfilerConfigurationPath(String path) {
this.profilerConfigurationPath = path;
return this;
}
public ConfigUploadComponent withProfilerConfiguration(ProfilerConfig profilerConfig) {
this.profilerConfig = profilerConfig;
return this;
}
}