blob: 9f2d8cd4646c0a2833de7f9e01722418d212d805 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.scheduler.utils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.storm.DaemonConfig;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
/**
* A dynamic loader that can load scheduler configurations for user resource guarantees from Artifactory (an artifact repository manager).
* This is not thread-safe.
*/
public class ArtifactoryConfigLoader implements IConfigLoader {
protected static final String LOCAL_ARTIFACT_DIR = "scheduler_artifacts";
static final String cacheFilename = "latest.yaml";
private static final String DEFAULT_ARTIFACTORY_BASE_DIRECTORY = "/artifactory";
private static final int DEFAULT_POLLTIME_SECS = 600;
private static final int DEFAULT_TIMEOUT_SECS = 10;
private static final String ARTIFACTORY_SCHEME_PREFIX = "artifactory+";
private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoader.class);
private Map<String, Object> conf;
private int artifactoryPollTimeSecs = DEFAULT_POLLTIME_SECS;
private boolean cacheInitialized = false;
// Location of the file in the artifactory archive. Also used to name file in cache.
private String localCacheDir;
private String baseDirectory = DEFAULT_ARTIFACTORY_BASE_DIRECTORY;
private int lastReturnedTime = 0;
private int timeoutSeconds = DEFAULT_TIMEOUT_SECS;
private Map<String, Object> lastReturnedValue;
private URI targetUri = null;
private JSONParser jsonParser;
private String scheme;
public ArtifactoryConfigLoader(Map<String, Object> conf) {
this.conf = conf;
Integer thisTimeout = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS);
if (thisTimeout != null) {
timeoutSeconds = thisTimeout;
}
Integer thisPollTime = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_POLLTIME_SECS);
if (thisPollTime != null) {
artifactoryPollTimeSecs = thisPollTime;
}
String thisBase = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY);
if (thisBase != null) {
baseDirectory = thisBase;
}
String uriString = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
if (uriString == null) {
LOG.error("No URI defined in {} configuration.", DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
} else {
try {
targetUri = new URI(uriString);
scheme = targetUri.getScheme().substring(ARTIFACTORY_SCHEME_PREFIX.length());
} catch (URISyntaxException e) {
LOG.error("Failed to parse uri={}", uriString);
}
}
jsonParser = new JSONParser();
}
/**
* Load the configs associated with the configKey from the targetURI.
* @param configKey The key from which we want to get the scheduler config.
* @return The scheduler configuration if exists; null otherwise.
*/
@Override
public Map<String, Object> load(String configKey) {
if (targetUri == null) {
return null;
}
// Check for new file every so often
int currentTimeSecs = Time.currentTimeSecs();
if (lastReturnedValue != null && ((currentTimeSecs - lastReturnedTime) < artifactoryPollTimeSecs)) {
LOG.debug("currentTimeSecs: {}; lastReturnedTime {}; artifactoryPollTimeSecs: {}. Returning our last map.",
currentTimeSecs, lastReturnedTime, artifactoryPollTimeSecs);
return (Map<String, Object>) lastReturnedValue.get(configKey);
}
try {
Map<String, Object> raw = loadFromUri(targetUri);
if (raw != null) {
return (Map<String, Object>) raw.get(configKey);
}
} catch (Exception e) {
LOG.error("Failed to load from uri {}", targetUri);
}
return null;
}
/**
* Protected so we can override this in unit tests.
*
* @param api null if we are trying to download artifact, otherwise a string to call REST api,
* e.g. "/api/storage"
* @param artifact location of artifact
* @param host Artifactory hostname
* @param port Artifactory port
* @return null on failure or the response string if return code is in 200 range
*/
protected String doGet(String api, String artifact, String host, Integer port) {
URIBuilder builder = new URIBuilder().setScheme(scheme).setHost(host).setPort(port);
String path = null;
if (api != null) {
path = baseDirectory + "/" + api + "/" + artifact;
} else {
path = baseDirectory + "/" + artifact;
}
// Get rid of multiple '/' in url
path = path.replaceAll("/[/]+", "/");
builder.setPath(path);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(timeoutSeconds * 1000).build();
HttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
String returnValue;
try {
LOG.debug("About to issue a GET to {}", builder);
HttpGet httpget = new HttpGet(builder.build());
String responseBody;
responseBody = httpclient.execute(httpget, GetStringResponseHandler.getInstance());
returnValue = responseBody;
} catch (Exception e) {
LOG.error("Received exception while connecting to Artifactory", e);
returnValue = null;
}
LOG.debug("Returning {}", returnValue);
return returnValue;
}
private JSONObject getArtifactMetadata(String location, String host, Integer port) {
String metadataStr = null;
metadataStr = doGet("/api/storage", location, host, port);
if (metadataStr == null) {
return null;
}
JSONObject returnValue;
try {
returnValue = (JSONObject) jsonParser.parse(metadataStr);
} catch (ParseException e) {
LOG.error("Could not parse JSON string {}", metadataStr, e);
return null;
}
return returnValue;
}
private String loadMostRecentArtifact(String location, String host, Integer port) {
// Is this a directory or is it a file?
JSONObject json = getArtifactMetadata(location, host, port);
if (json == null) {
LOG.error("got null metadata");
return null;
}
String downloadUri = (String) json.get("downloadUri");
// This means we are pointing at a file.
if (downloadUri != null) {
// Then get it and return the file as string.
String returnValue = doGet(null, location, host, port);
saveInArtifactoryCache(returnValue);
return returnValue;
}
// This should mean that we were pointed at a directory.
// Find the most recent child and load that.
JSONArray msg = (JSONArray) json.get("children");
if (msg == null || msg.size() == 0) {
LOG.error("Expected directory children not present");
return null;
}
JSONObject newest = (JSONObject) Collections.max(msg, new DirEntryCompare());
if (newest == null) {
LOG.error("Failed to find most recent artifact uri in {}", location);
return null;
}
String uri = (String) newest.get("uri");
if (uri == null) {
LOG.error("Expected directory uri not present");
return null;
}
String returnValue = doGet(null, location + uri, host, port);
saveInArtifactoryCache(returnValue);
return returnValue;
}
private void updateLastReturned(Map ret) {
lastReturnedTime = Time.currentTimeSecs();
lastReturnedValue = ret;
}
private Map<String, Object> loadFromFile(File file) {
Map<String, Object> ret = null;
try {
ret = (Map<String, Object>) Utils.readYamlFile(file.getCanonicalPath());
} catch (IOException e) {
LOG.error("Filed to load from file. Exception: {}", e.getMessage());
}
if (ret != null) {
try {
LOG.debug("returning a new map from file {}", file.getCanonicalPath());
} catch (java.io.IOException e) {
LOG.debug("Could not get PATH from file object in debug print. Ignoring");
}
return ret;
}
return null;
}
private Map<String, Object> getLatestFromCache() {
String localFileName = localCacheDir + File.separator + cacheFilename;
return loadFromFile(new File(localFileName));
}
private void saveInArtifactoryCache(String yamlData) {
if (yamlData == null) {
LOG.warn("Will not save null data into the artifactory cache");
return;
}
String localFileName = localCacheDir + File.separator + cacheFilename;
File cacheFile = new File(localFileName);
try (FileOutputStream fos = new FileOutputStream(cacheFile)) {
fos.write(yamlData.getBytes());
fos.flush();
} catch (IOException e) {
LOG.error("Received exception when writing file {}. Attempting delete", localFileName, e);
try {
cacheFile.delete();
} catch (Exception deleteException) {
LOG.error("Received exception when deleting file {}.", localFileName, deleteException);
}
}
}
private void makeArtifactoryCache(String location) throws IOException {
// First make the cache dir
String localDirName = ServerConfigUtils.masterLocalDir(conf) + File.separator + LOCAL_ARTIFACT_DIR;
File dir = new File(localDirName);
if (!dir.exists()) {
dir.mkdirs();
}
localCacheDir = localDirName + File.separator + location.replaceAll(File.separator, "_");
dir = new File(localCacheDir);
if (!dir.exists()) {
dir.mkdir();
}
cacheInitialized = true;
}
private Map<String, Object> loadFromUri(URI uri) throws IOException {
String host = uri.getHost();
Integer port = uri.getPort();
String location = uri.getPath();
if (location.toLowerCase().startsWith(baseDirectory.toLowerCase())) {
location = location.substring(baseDirectory.length());
}
if (!cacheInitialized) {
makeArtifactoryCache(location);
}
// Get the most recent artifact as a String, and then parse the yaml
String yamlConfig = loadMostRecentArtifact(location, host, port);
// If we failed to get anything from Artifactory try to get it from our local cache
if (yamlConfig == null) {
Map<String, Object> ret = getLatestFromCache();
updateLastReturned(ret);
return ret;
}
// Now parse it and return the map.
Yaml yaml = new Yaml(new SafeConstructor());
Map ret = null;
try {
ret = (Map) yaml.load(yamlConfig);
} catch (Exception e) {
LOG.error("Could not parse yaml.");
return null;
}
if (ret != null) {
LOG.debug("returning a new map from Artifactory");
updateLastReturned(ret);
return ret;
}
return null;
}
/**
* A private class used to check the response coming back from httpclient.
*/
private static class GetStringResponseHandler implements ResponseHandler<String> {
private static GetStringResponseHandler singleton = null;
/**
* Get instance.
* @return a singleton httpclient GET response handler
*/
public static GetStringResponseHandler getInstance() {
if (singleton == null) {
singleton = new GetStringResponseHandler();
}
return singleton;
}
/**
* Handle response.
* @param response The http response to verify.
* @return null on failure or the response string if return code is in 200 range
*/
@Override
public String handleResponse(final HttpResponse response) throws IOException {
int status = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
String entityString = (entity != null ? EntityUtils.toString(entity) : null);
if (status >= 200 && status < 300) {
return entityString;
} else {
LOG.error("Got unexpected response code {}; entity: {}", status, entityString);
return null;
}
}
}
private class DirEntryCompare implements Comparator<JSONObject> {
@Override
public int compare(JSONObject o1, JSONObject o2) {
return ((String) o1.get("uri")).compareTo((String) o2.get("uri"));
}
}
}