blob: 86287ae933a7363b69c5fefb3b3ada5e1c391f0b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
#define __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
#include <list>
#include <string>
#include <tuple>
#include <csi/spec.hpp>
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/ssl/flags.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/flags.hpp>
#include <stout/hashmap.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include "resource_provider/storage/disk_profile_utils.hpp"
namespace mesos {
namespace internal {
namespace storage {
// Forward declaration.
class UriDiskProfileAdaptorProcess;
// The `UriDiskProfileAdaptor` is an example DiskProfileAdaptor module
// that takes a URI as a module parameter and fetches that URI
// periodically. The fetched data is parsed into the required CSI
// protobufs (which also acts as validation).
//
// If there is an error during fetching, any previously fetched results
// will be used until fetching is successful.
//
// This module does not filter return results based on
// `CSIPluginInfo::type` and assumes that all fetched profiles are meant
// for all resource providers.
//
// See `UriDiskProfileAdaptor::Flags` below for more information.
class UriDiskProfileAdaptor : public DiskProfileAdaptor
{
public:
struct Flags : public virtual flags::FlagsBase
{
Flags()
{
add(&Flags::uri,
"uri",
None(),
"URI to a JSON object containing the disk profile mapping.\n"
"This module supports both HTTP(s) and file URIs\n."
"\n"
"The JSON object should consist of some top-level string keys\n"
"corresponding to the disk profile name. Each value should contain\n"
"a `ResourceProviderSelector` under 'resource_provider_selector' or\n"
"a `CSIPluginTypeSelector` under 'csi_plugin_type_selector' to\n"
"specify the set of resource providers this profile applies to,\n"
"followed by a `VolumeCapability` under 'volume_capabilities'\n"
"and a free-form string-string mapping under 'create_parameters'.\n"
"\n"
"The JSON is modeled after a protobuf found in\n"
"`src/resource_provider/storage/disk_profile.proto`.\n"
"\n"
"For example:\n"
"{\n"
" \"profile_matrix\" : {\n"
" \"my-profile\" : {\n"
" \"csi_plugin_type_selector\": {\n"
" \"plugin_type\" : \"org.apache.mesos.csi.test\"\n"
" \"},\n"
" \"volume_capabilities\" : {\n"
" \"block\" : {},\n"
" \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n"
" },\n"
" \"create_parameters\" : {\n"
" \"mesos-does-not\" : \"interpret-these\",\n"
" \"type\" : \"raid5\",\n"
" \"stripes\" : \"3\",\n"
" \"stripesize\" : \"64\"\n"
" }\n"
" }\n"
" }\n"
"}",
static_cast<const Path*>(nullptr),
[](const Path& value) -> Option<Error> {
// For now, just check if the URI has a supported scheme.
//
// TODO(josephw): Once we have a proper URI class and parser,
// consider validating this URI more thoroughly.
if (strings::startsWith(value.string(), "http://")
#ifdef USE_SSL_SOCKET
|| (process::network::openssl::flags().enabled &&
strings::startsWith(value.string(), "https://"))
#endif // USE_SSL_SOCKET
) {
Try<process::http::URL> url =
process::http::URL::parse(value.string());
if (url.isError()) {
return Error("Failed to parse URI: " + url.error());
}
return None();
}
// NOTE: The `Path` class will strip off the 'file://' prefix.
if (strings::contains(value.string(), "://")) {
return Error(
"--uri must use a supported scheme (file or http(s))");
}
// We only allow absolute paths for file paths.
if (!value.absolute()) {
return Error("--uri to a file must be an absolute path");
}
return None();
});
add(&Flags::poll_interval,
"poll_interval",
"How long to wait between polling the specified `--uri`.\n"
"The time is checked each time the `translate` method is called.\n"
"If the given time has elapsed, then the URI is re-fetched.\n"
"If not specified, the URI is only fetched once.",
[](const Option<Duration>& value) -> Option<Error> {
if (value.isSome() && value.get() <= Seconds(0)) {
return Error("--poll_interval must be non-negative");
}
return None();
});
add(&Flags::max_random_wait,
"max_random_wait",
"How long at most to wait between discovering a new set of profiles\n"
"and notifying the callers of `watch`. The actual wait time is a\n"
"uniform random value between 0 and this value. If `--uri` points\n"
"to a centralized location, it may be good to scale this number\n"
"according to the number of resource providers in the cluster.",
Seconds(0),
[](const Duration& value) -> Option<Error> {
if (value < Seconds(0)) {
return Error("--max_random_wait must be zero or greater");
}
return None();
});
}
// NOTE: We use the `Path` type here so that the stout flags parser
// does not attempt to read a file if given a `file://` prefixed value.
//
// TODO(josephw): Replace with a URI type when stout gets one.
Path uri;
Option<Duration> poll_interval;
Duration max_random_wait;
};
UriDiskProfileAdaptor(const Flags& _flags);
~UriDiskProfileAdaptor() override;
process::Future<DiskProfileAdaptor::ProfileInfo> translate(
const std::string& profile,
const ResourceProviderInfo& resourceProviderInfo) override;
process::Future<hashset<std::string>> watch(
const hashset<std::string>& knownProfiles,
const ResourceProviderInfo& resourceProviderInfo) override;
protected:
Flags flags;
process::Owned<UriDiskProfileAdaptorProcess> process;
};
class UriDiskProfileAdaptorProcess :
public process::Process<UriDiskProfileAdaptorProcess>
{
public:
UriDiskProfileAdaptorProcess(const UriDiskProfileAdaptor::Flags& _flags);
void initialize() override;
process::Future<DiskProfileAdaptor::ProfileInfo> translate(
const std::string& profile,
const ResourceProviderInfo& resourceProviderInfo);
process::Future<hashset<std::string>> watch(
const hashset<std::string>& knownProfiles,
const ResourceProviderInfo& resourceProviderInfo);
// Helpers for fetching the `--uri`.
// If `--poll_interval` is set, this method will dispatch to itself with
// a delay once the fetch is complete.
// Made public for testing purpose.
void poll();
void _poll(const process::Future<process::http::Response>& future);
void __poll(const Try<std::string>& fetched);
private:
// Helper that is called upon successfully polling and parsing the `--uri`.
// This method will validate that the capability and parameters of a known
// profile must remain the same. Then, any watcher will be notified if its set
// of profiles has been changed.
void notify(const resource_provider::DiskProfileMapping& parsed);
UriDiskProfileAdaptor::Flags flags;
struct ProfileRecord
{
resource_provider::DiskProfileMapping::CSIManifest manifest;
// True if the profile is seen in the last fetched profile mapping.
bool active;
};
// The mapping of all profiles seen so far.
// Profiles can only be added and never removed from this mapping. Once added,
// a profile's volume capability and parameters cannot be changed.
//
// TODO(josephw): Consider persisting this mapping across agent restarts.
hashmap<std::string, ProfileRecord> profileMatrix;
struct WatcherData
{
WatcherData(
const hashset<std::string>& _known, const ResourceProviderInfo& _info)
: known(_known), info(_info) {}
hashset<std::string> known;
ResourceProviderInfo info;
process::Promise<hashset<std::string>> promise;
};
std::vector<WatcherData> watchers;
};
} // namespace storage {
} // namespace internal {
} // namespace mesos {
#endif // __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__