IGNITE-17101 Move ignite-mesos to the Ignite Extensions project (#153)
diff --git a/modules/mesos-ext/README.txt b/modules/mesos-ext/README.txt
new file mode 100644
index 0000000..96143cf
--- /dev/null
+++ b/modules/mesos-ext/README.txt
@@ -0,0 +1,28 @@
+Apache Ignite Mesos Module
+------------------------
+
+Apache Ignite Mesos module provides integration Apache Ignite with Apache Mesos.
+
+Importing Apache Ignite Mesos Module In Maven Project
+-------------------------------------
+
+If you are using Maven to manage dependencies of your project, you can add Mesos module
+dependency like this (replace '${ignite.version}' with actual Ignite version you are
+interested in):
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ ...
+ <dependencies>
+ ...
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-mesos-ext</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ ...
+ </dependencies>
+ ...
+</project>
diff --git a/modules/mesos-ext/licenses/apache-2.0.txt b/modules/mesos-ext/licenses/apache-2.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/mesos-ext/licenses/apache-2.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/modules/mesos-ext/pom.xml b/modules/mesos-ext/pom.xml
new file mode 100644
index 0000000..5b99697
--- /dev/null
+++ b/modules/mesos-ext/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent-ext-internal</artifactId>
+ <version>1</version>
+ <relativePath>../../parent-internal/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-mesos-ext</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <url>https://ignite.apache.org</url>
+
+ <properties>
+ <mesos.version>1.11.0</mesos.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ <version>${mesos.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.ignite.mesos.IgniteFramework</mainClass>
+ </manifest>
+ </archive>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8.2</version>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/ClusterProperties.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
new file mode 100644
index 0000000..2f158b9
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * The class defines cluster configuration. This configuration created from properties file
+ * that passed on startup or environment variables.
+ * <p>
+ * If Mesos cluster working in Intranet or behind NAT then an access to local resources can be set
+ * by {@link #ignitePackagePath()} or {@link #ignitePackageUrl()} which should be available from nodes.
+ */
+public class ClusterProperties {
+ /** */
+ private static final Logger log = Logger.getLogger(ClusterProperties.class.getSimpleName());
+
+ /** Unlimited. */
+ public static final double UNLIMITED = Double.MAX_VALUE;
+
+ /** */
+ public static final String MESOS_MASTER_URL = "MESOS_MASTER_URL";
+
+ /** */
+ public static final String DEFAULT_MESOS_MASTER_URL = "zk://localhost:2181/mesos";
+
+ /** Mesos master url. */
+ private String mesosUrl = DEFAULT_MESOS_MASTER_URL;
+
+ /** */
+ public static final String IGNITE_JVM_OPTS = "IGNITE_JVM_OPTS";
+
+ /** JVM options. */
+ private String jvmOpts = "";
+
+ /** */
+ public static final String IGNITE_CLUSTER_NAME = "IGNITE_CLUSTER_NAME";
+
+ /** */
+ public static final String DEFAULT_CLUSTER_NAME = "ignite-cluster";
+
+ /** Mesos master url. */
+ private String clusterName = DEFAULT_CLUSTER_NAME;
+
+ /** */
+ public static final String IGNITE_HTTP_SERVER_HOST = "IGNITE_HTTP_SERVER_HOST";
+
+ /** Http server host. */
+ private String httpSrvHost = null;
+
+ /** */
+ public static final String IGNITE_HTTP_SERVER_PORT = "IGNITE_HTTP_SERVER_PORT";
+
+ /** */
+ public static final String DEFAULT_HTTP_SERVER_PORT = "48610";
+
+ /** Http server host. */
+ private int httpSrvPort = Integer.valueOf(DEFAULT_HTTP_SERVER_PORT);
+
+ /** */
+ public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU";
+
+ /** CPU limit. */
+ private double cpu = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_RUN_CPU_PER_NODE = "IGNITE_RUN_CPU_PER_NODE";
+
+ /** CPU limit. */
+ private double cpuPerNode = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_TOTAL_MEMORY = "IGNITE_TOTAL_MEMORY";
+
+ /** Memory limit. */
+ private double mem = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_MEMORY_PER_NODE = "IGNITE_MEMORY_PER_NODE";
+
+ /** Memory limit. */
+ private double memPerNode = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_TOTAL_DISK_SPACE = "IGNITE_TOTAL_DISK_SPACE";
+
+ /** Disk space limit. */
+ private double disk = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_DISK_SPACE_PER_NODE = "IGNITE_DISK_SPACE_PER_NODE";
+
+ /** Disk space limit. */
+ private double diskPerNode = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
+
+ /** Node count limit. */
+ private double nodeCnt = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_MIN_CPU_PER_NODE = "IGNITE_MIN_CPU_PER_NODE";
+
+ /** */
+ public static final double DEFAULT_RESOURCE_MIN_CPU = 1;
+
+ /** Min memory per node. */
+ private double minCpu = DEFAULT_RESOURCE_MIN_CPU;
+
+ /** */
+ public static final String IGNITE_MIN_MEMORY_PER_NODE = "IGNITE_MIN_MEMORY_PER_NODE";
+
+ /** */
+ public static final double DEFAULT_RESOURCE_MIN_MEM = 256;
+
+ /** Min memory per node. */
+ private double minMemory = DEFAULT_RESOURCE_MIN_MEM;
+
+ /** */
+ public static final String IGNITE_VERSION = "IGNITE_VERSION";
+
+ /** */
+ public static final String DEFAULT_IGNITE_VERSION = "latest";
+
+ /** Ignite version. */
+ private String igniteVer = DEFAULT_IGNITE_VERSION;
+
+ /** */
+ public static final String IGNITE_PACKAGE_URL = "IGNITE_PACKAGE_URL";
+
+ /** Ignite package url. */
+ private String ignitePkgUrl;
+
+ /** */
+ public static final String IGNITE_PACKAGE_PATH = "IGNITE_PACKAGE_PATH";
+
+ /** Ignite package path. */
+ private String ignitePkgPath;
+
+ /** */
+ public static final String IGNITE_WORK_DIR = "IGNITE_WORK_DIR";
+
+ /** */
+ public static final String DEFAULT_IGNITE_WORK_DIR = "ignite-releases/";
+
+ /** Ignite version. */
+ private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR;
+
+ /** */
+ public static final String IGNITE_USERS_LIBS = "IGNITE_USERS_LIBS";
+
+ /** Path to users libs. */
+ private String userLibs = null;
+
+ /** */
+ public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL";
+
+ /** URL to users libs. */
+ private String userLibsUrl = null;
+
+ /** */
+ public static final String LICENCE_URL = "LICENCE_URL";
+
+ /** Licence url. */
+ private String licenceUrl = null;
+
+ /** */
+ public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG";
+
+ /** Ignite config. */
+ private String igniteCfg = null;
+
+ /** */
+ public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL";
+
+ /** */
+ public static final String IGNITE_HTTP_SERVER_IDLE_TIMEOUT = "IGNITE_HTTP_SERVER_IDLE_TIMEOUT";
+
+ /** */
+ public static final long IGNITE_HTTP_SERVER_IDLE_TIMEOUT_DEFAULT = 30000L;
+
+ /** Jetty idle timeout. */
+ private long idleTimeout = IGNITE_HTTP_SERVER_IDLE_TIMEOUT_DEFAULT;
+
+ /** Url to ignite config. */
+ private String igniteCfgUrl = null;
+
+ /** */
+ public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT";
+
+ /** Url to ignite config. */
+ private Pattern hostnameConstraint = null;
+
+ /** */
+ public ClusterProperties() {
+ // No-op.
+ }
+
+ /**
+ * @return Cluster name.
+ */
+ public String clusterName() {
+ return clusterName;
+ }
+
+ /**
+ * @return CPU count limit.
+ */
+ public double cpus() {
+ return cpu;
+ }
+
+ /**
+ * Sets CPU count limit.
+ *
+ * @param cpu CPU count limit.
+ */
+ public void cpus(double cpu) {
+ this.cpu = cpu;
+ }
+
+ /**
+ * @return CPU count limit.
+ */
+ public double cpusPerNode() {
+ return cpuPerNode;
+ }
+
+ /**
+ * Sets CPU count limit.
+ *
+ * @param cpu CPU per node count limit.
+ */
+ public void cpusPerNode(double cpu) {
+ this.cpuPerNode = cpu;
+ }
+
+ /**
+ * @return mem limit.
+ */
+ public double memory() {
+ return mem;
+ }
+
+ /**
+ * Sets mem limit.
+ *
+ * @param mem Memory.
+ */
+ public void memory(double mem) {
+ this.mem = mem;
+ }
+
+ /**
+ * @return mem limit.
+ */
+ public double memoryPerNode() {
+ return memPerNode;
+ }
+
+ /**
+ * Sets mem limit.
+ *
+ * @param mem Memory.
+ */
+ public void memoryPerNode(double mem) {
+ this.memPerNode = mem;
+ }
+
+ /**
+ * @return JVM opts for ignite.
+ */
+ public String jmvOpts() {
+ return this.jvmOpts;
+ }
+
+ /**
+ * @return disk limit.
+ */
+ public double disk() {
+ return disk;
+ }
+
+ /**
+ * @return disk limit per node.
+ */
+ public double diskPerNode() {
+ return diskPerNode;
+ }
+
+ /**
+ * @return instance count limit.
+ */
+ public double instances() {
+ return nodeCnt;
+ }
+
+ /**
+ * @return min memory per node.
+ */
+ public double minMemoryPerNode() {
+ return minMemory;
+ }
+
+ /**
+ * Sets min memory.
+ *
+ * @param minMemory Min memory.
+ */
+ public void minMemoryPerNode(double minMemory) {
+ this.minMemory = minMemory;
+ }
+
+ /**
+ * Sets hostname constraint.
+ *
+ * @param ptrn Hostname pattern.
+ */
+ public void hostnameConstraint(Pattern ptrn) {
+ this.hostnameConstraint = ptrn;
+ }
+
+ /**
+ * @return min cpu count per node.
+ */
+ public double minCpuPerNode() {
+ return minCpu;
+ }
+
+ /**
+ * Sets min cpu count per node.
+ *
+ * @param minCpu min cpu count per node.
+ */
+ public void minCpuPerNode(double minCpu) {
+ this.minCpu = minCpu;
+ }
+
+ /**
+ * @return Ignite version.
+ */
+ public String igniteVer() {
+ return igniteVer;
+ }
+
+ /**
+ * @return Working directory.
+ */
+ public String igniteWorkDir() {
+ return igniteWorkDir;
+ }
+
+ /**
+ * @return User's libs.
+ */
+ public String userLibs() {
+ return userLibs;
+ }
+
+ /**
+ * @return Ignite configuration.
+ */
+ public String igniteCfg() {
+ return igniteCfg;
+ }
+
+ /**
+ * @return Master url.
+ */
+ public String masterUrl() {
+ return mesosUrl;
+ }
+
+ /**
+ * @return Http server host.
+ */
+ public String httpServerHost() {
+ return httpSrvHost;
+ }
+
+ /**
+ * @return Http server port.
+ */
+ public int httpServerPort() {
+ return httpSrvPort;
+ }
+
+ /**
+ * Sets the maximum Idle time for a http connection, which will be used for
+ * jetty server. The server provides resources for ignite mesos framework such as
+ * ignite archive, user's libs, configurations and etc.
+ *
+ * @return Http server idle timeout.
+ */
+ public long idleTimeout() {
+ return idleTimeout;
+ }
+
+ /**
+ * URL to ignite package. The URL should to point at valid apache ignite archive.
+ * This property can be useful if using own apache ignite build.
+ *
+ * @return Url to ignite package.
+ */
+ public String ignitePackageUrl() {
+ return ignitePkgUrl;
+ }
+
+ /**
+ * Path on local file system to ignite archive. That can be useful when
+ * Mesos working in Intranet or behind NAT.
+ *
+ * @return Path on local host to ignite package.
+ */
+ public String ignitePackagePath() {
+ return ignitePkgPath;
+ }
+
+ /**
+ * @return Url to ignite configuration.
+ */
+ public String igniteConfigUrl() {
+ return igniteCfgUrl;
+ }
+
+ /**
+ * @return Url to users libs configuration.
+ */
+ public String usersLibsUrl() {
+ return userLibsUrl;
+ }
+
+ /**
+ * @return Url to licence.
+ */
+ public String licenceUrl() {
+ return licenceUrl;
+ }
+
+ /**
+ * @return Host name constraint.
+ */
+ public Pattern hostnameConstraint() {
+ return hostnameConstraint;
+ }
+
+ /**
+ * @param cfg path to config file.
+ * @return Cluster configuration.
+ */
+ public static ClusterProperties from(String cfg) {
+ try {
+ Properties props = null;
+
+ if (cfg != null) {
+ props = new Properties();
+
+ try (FileInputStream in = new FileInputStream(cfg)) {
+ props.load(in);
+ }
+ }
+
+ ClusterProperties prop = new ClusterProperties();
+
+ prop.mesosUrl = getStringProperty(MESOS_MASTER_URL, props, DEFAULT_MESOS_MASTER_URL);
+
+ prop.httpSrvHost = getStringProperty(IGNITE_HTTP_SERVER_HOST, props, getNonLoopbackAddress());
+
+ String port = System.getProperty("PORT0");
+
+ if (port != null && !port.isEmpty())
+ prop.httpSrvPort = Integer.valueOf(port);
+ else
+ prop.httpSrvPort = Integer.valueOf(getStringProperty(IGNITE_HTTP_SERVER_PORT, props,
+ DEFAULT_HTTP_SERVER_PORT));
+
+ prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
+
+ prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
+ prop.ignitePkgUrl = getStringProperty(IGNITE_PACKAGE_URL, props, null);
+ prop.ignitePkgPath = getStringProperty(IGNITE_PACKAGE_PATH, props, null);
+ prop.licenceUrl = getStringProperty(LICENCE_URL, props, null);
+ prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
+
+ prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED);
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED);
+ prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED);
+ prop.disk = getDoubleProperty(IGNITE_TOTAL_DISK_SPACE, props, UNLIMITED);
+ prop.diskPerNode = getDoubleProperty(IGNITE_DISK_SPACE_PER_NODE, props, 1024.0);
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED);
+ prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
+ prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);
+
+ prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, props, "");
+
+ prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORK_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
+ prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
+
+ String ptrn = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null);
+
+ prop.idleTimeout = getLongProperty(IGNITE_HTTP_SERVER_IDLE_TIMEOUT, props, IGNITE_HTTP_SERVER_IDLE_TIMEOUT_DEFAULT);
+
+ if (ptrn != null) {
+ try {
+ prop.hostnameConstraint = Pattern.compile(ptrn);
+ }
+ catch (PatternSyntaxException e) {
+ log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
+ }
+ }
+
+ return prop;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param name Property name.
+ * @param fileProps Property file.
+ * @return Property value.
+ */
+ private static double getDoubleProperty(String name, Properties fileProps, Double dfltVal) {
+ if (fileProps != null && fileProps.containsKey(name))
+ return Double.valueOf(fileProps.getProperty(name));
+
+ String prop = System.getProperty(name);
+
+ if (prop == null)
+ prop = System.getenv(name);
+
+ return prop == null ? dfltVal : Double.valueOf(prop);
+ }
+
+ /**
+ * @param name Property name.
+ * @param fileProps Property file.
+ * @return Property value.
+ */
+ private static long getLongProperty(String name, Properties fileProps, Long dfltVal) {
+ if (fileProps != null && fileProps.containsKey(name))
+ return Long.valueOf(fileProps.getProperty(name));
+
+ String prop = System.getProperty(name);
+
+ if (prop == null)
+ prop = System.getenv(name);
+
+ return prop == null ? dfltVal : Long.valueOf(prop);
+ }
+
+ /**
+ * @param name Property name.
+ * @param fileProps Property file.
+ * @return Property value.
+ */
+ private static String getStringProperty(String name, Properties fileProps, String dfltVal) {
+ if (fileProps != null && fileProps.containsKey(name))
+ return fileProps.getProperty(name);
+
+ String prop = System.getProperty(name);
+
+ if (prop == null)
+ prop = System.getenv(name);
+
+ return prop == null ? dfltVal : prop;
+ }
+
+ /**
+ * Finds a local, non-loopback, IPv4 address
+ *
+ * @return The first non-loopback IPv4 address found, or <code>null</code> if no such addresses found
+ * @throws java.net.SocketException If there was a problem querying the network interfaces
+ */
+ public static String getNonLoopbackAddress() throws SocketException {
+ Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
+
+ while (ifaces.hasMoreElements()) {
+ NetworkInterface iface = ifaces.nextElement();
+
+ Enumeration<InetAddress> addrs = iface.getInetAddresses();
+
+ while (addrs.hasMoreElements()) {
+ InetAddress addr = addrs.nextElement();
+
+ if (addr instanceof Inet4Address && !addr.isLoopbackAddress())
+ return addr.getHostAddress();
+ }
+ }
+
+ throw new RuntimeException("Failed. Could not find non-loopback address");
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
new file mode 100644
index 0000000..b9cba99
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.ignite.mesos.resource.IgniteProvider;
+import org.apache.ignite.mesos.resource.JettyServer;
+import org.apache.ignite.mesos.resource.ResourceHandler;
+import org.apache.ignite.mesos.resource.ResourceProvider;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+
+/**
+ * Ignite mesos framework.
+ */
+public class IgniteFramework {
+ /** */
+ private static final Logger log = Logger.getLogger(IgniteFramework.class.getSimpleName());
+
+ /** Framework name. */
+ private static final String IGNITE_FRAMEWORK_NAME = "Ignite";
+
+ /** MESOS system environment name */
+ private static final String MESOS_USER_NAME = "MESOS_USER";
+
+ /** MESOS system environment role */
+ private static final String MESOS_ROLE = "MESOS_ROLE";
+
+ /** */
+ private static final String MESOS_AUTHENTICATE = "MESOS_AUTHENTICATE";
+
+ /** */
+ private static final String DEFAULT_PRINCIPAL = "DEFAULT_PRINCIPAL";
+
+ /** */
+ private static final String DEFAULT_SECRET = "DEFAULT_SECRET";
+
+ /** */
+ private static final String MESOS_CHECKPOINT = "MESOS_CHECKPOINT";
+
+ /**
+ * Main methods has only one optional parameter - path to properties files.
+ *
+ * @param args Args.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ IgniteFramework igniteFramework = new IgniteFramework();
+
+ ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null);
+
+ String baseUrl = String.format("http://%s:%d", clusterProps.httpServerHost(), clusterProps.httpServerPort());
+
+ JettyServer httpSrv = new JettyServer();
+
+ httpSrv.start(
+ new ResourceHandler(clusterProps.userLibs(), clusterProps.igniteCfg(), clusterProps.igniteWorkDir()),
+ clusterProps
+ );
+
+ ResourceProvider provider = new ResourceProvider();
+
+ IgniteProvider igniteProvider = new IgniteProvider(clusterProps.igniteWorkDir());
+
+ provider.init(clusterProps, igniteProvider, baseUrl);
+
+ // Create the scheduler.
+ Scheduler scheduler = new IgniteScheduler(clusterProps, provider);
+
+ // Create the driver.
+ MesosSchedulerDriver driver;
+
+ if (System.getenv(MESOS_AUTHENTICATE) != null) {
+ log.info("Enabling authentication for the framework");
+
+ if (System.getenv(DEFAULT_PRINCIPAL) == null) {
+ log.log(Level.SEVERE, "Expecting authentication principal in the environment");
+
+ System.exit(1);
+ }
+
+ if (System.getenv(DEFAULT_SECRET) == null) {
+ log.log(Level.SEVERE, "Expecting authentication secret in the environment");
+
+ System.exit(1);
+ }
+
+ Protos.Credential cred = Protos.Credential.newBuilder()
+ .setPrincipal(System.getenv(DEFAULT_PRINCIPAL))
+ .setSecret(System.getenv(DEFAULT_SECRET))
+ .build();
+
+ driver = new MesosSchedulerDriver(scheduler, igniteFramework.getFrameworkInfo(), clusterProps.masterUrl(),
+ cred);
+ }
+ else
+ driver = new MesosSchedulerDriver(scheduler, igniteFramework.getFrameworkInfo(), clusterProps.masterUrl());
+
+ int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;
+
+ httpSrv.stop();
+
+ // Ensure that the driver process terminates.
+ driver.stop();
+
+ System.exit(status);
+ }
+
+ /**
+ * @return Mesos Protos FrameworkInfo.
+ */
+ public Protos.FrameworkInfo getFrameworkInfo() throws Exception {
+ final int frameworkFailoverTimeout = 0;
+
+ Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder()
+ .setName(IGNITE_FRAMEWORK_NAME)
+ .setUser(getUser())
+ .setRole(getRole())
+ .setFailoverTimeout(frameworkFailoverTimeout);
+
+ if (System.getenv(MESOS_CHECKPOINT) != null) {
+ log.info("Enabling checkpoint for the framework");
+
+ frameworkBuilder.setCheckpoint(true);
+ }
+
+ if (System.getenv(MESOS_AUTHENTICATE) != null)
+ frameworkBuilder.setPrincipal(System.getenv(DEFAULT_PRINCIPAL));
+ else
+ frameworkBuilder.setPrincipal("ignite-framework-java");
+
+ return frameworkBuilder.build();
+ }
+
+ /**
+ * @return Mesos user name value.
+ */
+ protected String getUser() {
+ String userName = System.getenv(MESOS_USER_NAME);
+
+ return userName != null ? userName : "";
+ }
+
+ /**
+ * @return Mesos role value.
+ */
+ protected String getRole() {
+ String mesosRole = System.getenv(MESOS_ROLE);
+
+ return isRoleValid(mesosRole) ? mesosRole : "*";
+ }
+
+ /**
+ * @return Result of Mesos role validation.
+ */
+ static boolean isRoleValid(String mRole) {
+ if (mRole == null || mRole.isEmpty() || mRole.equals(".") || mRole.equals("..") ||
+ mRole.startsWith("-") || mRole.contains("/") || mRole.contains("\\") || mRole.contains(" ")) {
+ log.severe("Provided mesos role is not valid: [" + mRole +
+ "]. Mesos role should be a valid directory name.");
+
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
new file mode 100644
index 0000000..3a58f65
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.ignite.mesos.resource.ResourceProvider;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+/**
+ * Ignite scheduler receives offers from Mesos and decides how many resources will be occupied.
+ */
+public class IgniteScheduler implements Scheduler {
+ /** Cpus. */
+ public static final String CPU = "cpus";
+
+ /** Mem. */
+ public static final String MEM = "mem";
+
+ /** Disk. */
+ public static final String DISK = "disk";
+
+ /** Default port range. */
+ public static final String DEFAULT_PORT = ":47500..47510";
+
+ /** Delimiter char. */
+ public static final String DELIM = ",";
+
+ /** Logger. */
+ private static final Logger log = Logger.getLogger(IgniteScheduler.class.getSimpleName());
+
+ /** ID generator. */
+ private AtomicInteger taskIdGenerator = new AtomicInteger();
+
+ /** Task on host. */
+ private Map<String, IgniteTask> tasks = new HashMap<>();
+
+ /** Cluster resources. */
+ private ClusterProperties clusterProps;
+
+ /** Resource provider. */
+ private ResourceProvider resourceProvider;
+
+ /**
+ * @param clusterProps Cluster limit.
+ * @param resourceProvider Resource provider.
+ */
+ public IgniteScheduler(ClusterProperties clusterProps, ResourceProvider resourceProvider) {
+ this.clusterProps = clusterProps;
+ this.resourceProvider = resourceProvider;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) {
+ log.log(Level.FINE, "Offers resources: {0}", offers.size());
+
+ for (Protos.Offer offer : offers) {
+ IgniteTask igniteTask = checkOffer(offer);
+
+ // Decline offer which doesn't match by mem or cpu.
+ if (igniteTask == null) {
+ schedulerDriver.declineOffer(offer.getId());
+
+ continue;
+ }
+
+ // Generate a unique task ID.
+ Protos.TaskID taskId = Protos.TaskID.newBuilder()
+ .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build();
+
+ log.log(Level.INFO, "Launching task: {0}", igniteTask);
+
+ // Create task to run.
+ Protos.TaskInfo task = createTask(offer, igniteTask, taskId);
+
+ try {
+ schedulerDriver.launchTasks(Collections.singletonList(offer.getId()),
+ Collections.singletonList(task),
+ Protos.Filters.newBuilder().setRefuseSeconds(1).build());
+ }
+ catch (RuntimeException e) {
+ log.log(Level.SEVERE, "Failed launch task. Task id: {0}. Task info: {1}",
+ new Object[]{taskId, task, e});
+
+ throw e;
+ }
+
+ tasks.put(taskId.getValue(), igniteTask);
+ }
+ }
+
+ /**
+ * Create Task.
+ *
+ * @param offer Offer.
+ * @param igniteTask Task description.
+ * @param taskId Task id.
+ * @return Task.
+ */
+ private Protos.TaskInfo createTask(Protos.Offer offer, IgniteTask igniteTask, Protos.TaskID taskId) {
+ String cfgUrl = clusterProps.igniteConfigUrl() != null ?
+ clusterProps.igniteConfigUrl() : resourceProvider.igniteConfigUrl();
+
+ Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder()
+ .setEnvironment(Protos.Environment.newBuilder()
+ .addVariables(Protos.Environment.Variable.newBuilder()
+ .setName("IGNITE_TCP_DISCOVERY_ADDRESSES")
+ .setValue(getAddress(offer.getHostname())))
+ .addVariables(Protos.Environment.Variable.newBuilder()
+ .setName("JVM_OPTS")
+ .setValue(clusterProps.jmvOpts())))
+ .addUris(Protos.CommandInfo.URI.newBuilder()
+ .setValue(clusterProps.ignitePackageUrl() != null ?
+ clusterProps.ignitePackageUrl() : resourceProvider.igniteUrl())
+ .setExtract(true))
+ .addUris(Protos.CommandInfo.URI.newBuilder()
+ .setValue(cfgUrl));
+
+ // Collection user's libs.
+ Collection<String> usersLibs = new ArrayList<>();
+
+ if (clusterProps.usersLibsUrl() != null && !clusterProps.usersLibsUrl().isEmpty())
+ Collections.addAll(usersLibs, clusterProps.usersLibsUrl().split(DELIM));
+
+ if (resourceProvider.resourceUrl() != null && !resourceProvider.resourceUrl().isEmpty())
+ usersLibs.addAll(resourceProvider.resourceUrl());
+
+ for (String url : usersLibs)
+ builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url));
+
+ String cfgName = resourceProvider.configName();
+
+ if (clusterProps.igniteConfigUrl() != null)
+ cfgName = fileName(clusterProps.igniteConfigUrl());
+
+ String licenceFile = null;
+
+ if (clusterProps.licenceUrl() != null)
+ licenceFile = fileName(clusterProps.licenceUrl());
+
+ builder.setValue(
+ (licenceFile != null ? "find . -maxdepth 1 -name \"" + licenceFile + "\" -exec cp {} ./*/ \\; && " : "")
+ + "find . -maxdepth 1 -name \"*.jar\" -exec cp {} ./*/libs/ \\; && "
+ + "./*/bin/ignite.sh "
+ + cfgName
+ + " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m")
+ + " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m");
+
+ return Protos.TaskInfo.newBuilder()
+ .setName("Ignite node " + taskId.getValue())
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId())
+ .setCommand(builder)
+ .addResources(Protos.Resource.newBuilder()
+ .setName(CPU)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores())))
+ .addResources(Protos.Resource.newBuilder()
+ .setName(MEM)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem())))
+ .addResources(Protos.Resource.newBuilder()
+ .setName(DISK)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.disk())))
+ .build();
+ }
+
+ /**
+ * @param path Path.
+ * @return File name.
+ */
+ private String fileName(String path) {
+ String[] split = path.split("/");
+
+ return split[split.length - 1];
+ }
+
+ /**
+ * @return Address running nodes.
+ */
+ private String getAddress(String address) {
+ if (tasks.isEmpty()) {
+ if (address != null && !address.isEmpty())
+ return address + DEFAULT_PORT;
+
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ for (IgniteTask task : tasks.values())
+ sb.append(task.host()).append(DEFAULT_PORT).append(DELIM);
+
+ return sb.substring(0, sb.length() - 1);
+ }
+
+ /**
+ * Check slave resources and return resources infos.
+ *
+ * @param offer Offer request.
+ * @return Ignite task description.
+ */
+ private IgniteTask checkOffer(Protos.Offer offer) {
+ // Check limit on running nodes.
+ if (clusterProps.instances() <= tasks.size())
+ return null;
+
+ double cpus = -1;
+ double mem = -1;
+ double disk = -1;
+
+ // Check host name
+ if (clusterProps.hostnameConstraint() != null
+ && clusterProps.hostnameConstraint().matcher(offer.getHostname()).matches())
+ return null;
+
+ // Collect resource on slave.
+ for (Protos.Resource resource : offer.getResourcesList()) {
+ if (resource.getName().equals(CPU)) {
+ if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ cpus = resource.getScalar().getValue();
+ else
+ log.log(Level.FINE, "Cpus resource was not a scalar: {0}" + resource.getType());
+ }
+ else if (resource.getName().equals(MEM)) {
+ if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ mem = resource.getScalar().getValue();
+ else
+ log.log(Level.FINE, "Mem resource was not a scalar: {0}", resource.getType());
+ }
+ else if (resource.getName().equals(DISK))
+ if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ disk = resource.getScalar().getValue();
+ else
+ log.log(Level.FINE, "Disk resource was not a scalar: {0}", resource.getType());
+ }
+
+ // Check that slave satisfies min requirements.
+ if (cpus < clusterProps.minCpuPerNode() || mem < clusterProps.minMemoryPerNode()) {
+ log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+
+ return null;
+ }
+
+ double totalCpus = 0;
+ double totalMem = 0;
+ double totalDisk = 0;
+
+ // Collect occupied resources.
+ for (IgniteTask task : tasks.values()) {
+ totalCpus += task.cpuCores();
+ totalMem += task.mem();
+ totalDisk += task.disk();
+ }
+
+ cpus = Math.min(clusterProps.cpus() - totalCpus, Math.min(cpus, clusterProps.cpusPerNode()));
+ mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode()));
+ disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode()));
+
+ if ((clusterProps.cpusPerNode() != ClusterProperties.UNLIMITED && clusterProps.cpusPerNode() != cpus)
+ || (clusterProps.memoryPerNode() != ClusterProperties.UNLIMITED && clusterProps.memoryPerNode() != mem)) {
+ log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+
+ return null;
+ }
+
+ if (cpus > 0 && mem > 0)
+ return new IgniteTask(offer.getHostname(), cpus, mem, disk);
+ else {
+ log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
+ final String taskId = taskStatus.getTaskId().getValue();
+
+ log.log(Level.INFO, "Received update event task: {0} is in state: {1}",
+ new Object[]{taskId, taskStatus.getState()});
+
+ if (taskStatus.getState().equals(Protos.TaskState.TASK_FAILED)
+ || taskStatus.getState().equals(Protos.TaskState.TASK_ERROR)
+ || taskStatus.getState().equals(Protos.TaskState.TASK_FINISHED)
+ || taskStatus.getState().equals(Protos.TaskState.TASK_KILLED)
+ || taskStatus.getState().equals(Protos.TaskState.TASK_LOST)) {
+ IgniteTask failedTask = tasks.remove(taskId);
+
+ if (failedTask != null) {
+ List<Protos.Request> requests = new ArrayList<>();
+
+ Protos.Request request = Protos.Request.newBuilder()
+ .addResources(Protos.Resource.newBuilder()
+ .setType(Protos.Value.Type.SCALAR)
+ .setName(MEM)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
+ .addResources(Protos.Resource.newBuilder()
+ .setType(Protos.Value.Type.SCALAR)
+ .setName(CPU)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
+ .build();
+
+ requests.add(request);
+
+ schedulerDriver.requestResources(requests);
+ }
+ }
+ }
+
+ /**
+ * @param clusterProps Cluster properties.
+ */
+ public void setClusterProps(ClusterProperties clusterProps) {
+ this.clusterProps = clusterProps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
+ Protos.MasterInfo masterInfo) {
+ log.log(Level.INFO, "Scheduler registered. Master: {0}:{1}, framework={2}", new Object[]{masterInfo.getIp(),
+ masterInfo.getPort(), frameworkID});
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnected(SchedulerDriver schedulerDriver) {
+ log.info("Scheduler disconnected.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void error(SchedulerDriver schedulerDriver, String s) {
+ log.log(Level.SEVERE, "Failed. Error message: {0}", s);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
+ Protos.SlaveID slaveID, byte[] bytes) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
+ Protos.SlaveID slaveID, int i) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
+ // No-op.
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteTask.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteTask.java
new file mode 100644
index 0000000..391a381
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+/**
+ * Information about launched task.
+ */
+public class IgniteTask {
+ /** */
+ public final String host;
+
+ /** */
+ public final double cpuCores;
+
+ /** */
+ public final double mem;
+
+ /** */
+ public final double disk;
+
+ /**
+ * Ignite launched task.
+ *
+ * @param host Host.
+ * @param cpuCores Cpu cores count.
+ * @param mem Memory.
+ * @param disk Disk.
+ */
+ public IgniteTask(String host, double cpuCores, double mem, double disk) {
+ this.host = host;
+ this.cpuCores = cpuCores;
+ this.mem = mem;
+ this.disk = disk;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Cores count.
+ */
+ public double cpuCores() {
+ return cpuCores;
+ }
+
+ /**
+ * @return Memory.
+ */
+ public double mem() {
+ return mem;
+ }
+
+ /**
+ * @return Disk.
+ */
+ public double disk() {
+ return disk;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "IgniteTask " +
+ "host: [" + host + ']' +
+ ", cpuCores: [" + cpuCores + "]" +
+ ", mem: [" + mem + "]";
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/package-info.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/package-info.java
new file mode 100644
index 0000000..6e6a0f6
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains classes to support integration with Apache Mesos.
+ */
+
+package org.apache.ignite.mesos;
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
new file mode 100644
index 0000000..bd6d471
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ignite.mesos.ClusterProperties;
+
+import static org.apache.ignite.mesos.ClusterProperties.IGNITE_VERSION;
+
+/**
+ * Class downloads and stores Ignite.
+ */
+public class IgniteProvider {
+ /** Logger. */
+ private static final Logger log = Logger.getLogger(IgniteProvider.class.getSimpleName());
+
+ /** */
+ private static final String DOWNLOAD_URL_PATTERN = "https://archive.apache.org/dist/ignite/%s/apache-ignite-%s-bin.zip";
+
+ /** URL for request Ignite latest version. */
+ private static final String IGNITE_LATEST_VERSION_URL = "https://ignite.apache.org/latest";
+
+ /** Mirrors. */
+ private static final String APACHE_MIRROR_URL = "https://www.apache.org/dyn/closer.cgi?as_json=1";
+
+ /** Ignite on Apache URL path. */
+ private static final String IGNITE_PATH = "/ignite/%s/apache-ignite-%s-bin.zip";
+
+ /** Version pattern. */
+ private static final Pattern VERSION_PATTERN = Pattern.compile("(?<=version=).*\\S+");
+
+ /** */
+ private String downloadFolder;
+
+ /**
+ * @param downloadFolder Folder with ignite.
+ */
+ public IgniteProvider(String downloadFolder) {
+ this.downloadFolder = downloadFolder;
+ }
+
+ /**
+ * @param ver Ignite version.
+ * @return Path to latest ignite.
+ * @throws IOException If downloading failed.
+ */
+ public String getIgnite(String ver) throws IOException {
+ return downloadIgnite(ver);
+ }
+
+ /**
+ * @param ver Ignite version which will be downloaded. If {@code null} will download the latest ignite version.
+ * @return Ignite archive.
+ * @throws IOException If downloading failed.
+ */
+ public String downloadIgnite(String ver) throws IOException {
+ assert ver != null;
+
+ URL url;
+
+ // get the latest version.
+ if (ver.equals(ClusterProperties.DEFAULT_IGNITE_VERSION)) {
+ try {
+ ver = findLatestVersion();
+
+ // and try to retrieve from a mirror.
+ url = new URL(String.format(findMirror() + IGNITE_PATH, ver, ver));
+ }
+ catch (Exception e) {
+ // fallback to archive.
+ url = new URL(String.format(DOWNLOAD_URL_PATTERN, ver, ver));
+ }
+ }
+ else {
+ // or from archive.
+ url = new URL(String.format(DOWNLOAD_URL_PATTERN, ver, ver));
+ }
+
+ return downloadIgnite(url);
+ }
+
+ /**
+ * Attempts to retrieve the preferred mirror.
+ *
+ * @return Mirror url.
+ * @throws IOException If failed.
+ */
+ private String findMirror() throws IOException {
+ String response = getHttpContents(new URL(APACHE_MIRROR_URL));
+
+ if (response == null)
+ throw new RuntimeException("Failed to retrieve mirrors");
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode mirrorUrl = mapper.readTree(response).get("preferred");
+
+ if (mirrorUrl == null)
+ throw new RuntimeException("Failed to find the preferred mirror");
+
+ return mirrorUrl.asText();
+ }
+
+ /**
+ * Attempts to obtain the latest version.
+ *
+ * @return Latest version.
+ * @throws IOException If failed.
+ */
+ private String findLatestVersion() throws IOException {
+ String response = getHttpContents(new URL(IGNITE_LATEST_VERSION_URL));
+
+ if (response == null)
+ throw new RuntimeException("Failed to identify the latest version. Specify it with " + IGNITE_VERSION);
+
+ Matcher m = VERSION_PATTERN.matcher(response);
+ if (m.find())
+ return m.group();
+ else
+ throw new RuntimeException("Failed to retrieve the latest version. Specify it with " + IGNITE_VERSION);
+ }
+
+ /**
+ * @param url Url.
+ * @return Contents.
+ * @throws IOException If failed.
+ */
+ private String getHttpContents(URL url) throws IOException {
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+
+ int code = conn.getResponseCode();
+
+ if (code != 200)
+ throw null;
+
+ BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8"));
+ return rd.lines().collect(Collectors.joining());
+ }
+
+ /**
+ * Downloads ignite by URL if this version wasn't downloaded before.
+ *
+ * @param url URL to Ignite.
+ * @return File name.
+ */
+ private String downloadIgnite(URL url) {
+ assert url != null;
+
+ try {
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+
+ int code = conn.getResponseCode();
+
+ if (code == 200) {
+ checkDownloadFolder();
+
+ String fileName = fileName(url.toString());
+
+ if (fileExist(fileName))
+ return fileName;
+
+ log.log(Level.INFO, "Downloading from {0}", url.toString());
+
+ FileOutputStream outFile = new FileOutputStream(Paths.get(downloadFolder, fileName).toFile());
+
+ outFile.getChannel().transferFrom(Channels.newChannel(conn.getInputStream()), 0, Long.MAX_VALUE);
+
+ outFile.close();
+
+ return fileName;
+ }
+ else
+ throw new RuntimeException("Got unexpected response code. Response code: " + code + " from " + url);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to download Ignite.", e);
+ }
+ }
+
+ /**
+ * Checks that file exists.
+ *
+ * @param fileName File name.
+ * @return {@code True} if file exist otherwise {@code false}.
+ */
+ private boolean fileExist(String fileName) {
+ String pathToIgnite = downloadFolder + (downloadFolder.endsWith("/") ? "" : '/') + fileName;
+
+ return new File(pathToIgnite).exists();
+ }
+
+ /**
+ * Copy file to working directory.
+ *
+ * @param filePath File path.
+ * @return File name.
+ * @throws IOException If coping failed.
+ */
+ String copyToWorkDir(String filePath) throws IOException {
+ Path srcFile = Paths.get(filePath);
+
+ if (Files.exists(srcFile)) {
+ checkDownloadFolder();
+
+ Path newDir = Paths.get(downloadFolder);
+
+ Path fileName = srcFile.getFileName();
+
+ Files.copy(srcFile, newDir.resolve(fileName), StandardCopyOption.REPLACE_EXISTING);
+
+ return fileName.toString();
+ }
+
+ return null;
+ }
+
+ /**
+ * @return Download folder.
+ */
+ private File checkDownloadFolder() {
+ File file = new File(downloadFolder);
+
+ if (!file.exists())
+ file.mkdirs();
+
+ if (!file.exists())
+ throw new IllegalArgumentException("Failed to create working directory: " + downloadFolder);
+
+ return file;
+ }
+
+ /**
+ * @param url URL.
+ * @return File name.
+ */
+ private static String fileName(String url) {
+ String[] split = url.split("/");
+
+ return split[split.length - 1];
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
new file mode 100644
index 0000000..33e6c1e
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import org.apache.ignite.mesos.ClusterProperties;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+
+/**
+ * Embedded jetty server.
+ */
+public class JettyServer {
+ /** */
+ private Server server;
+
+ /**
+ * Starts jetty server.
+ *
+ * @param handler Handler.
+ * @param props Cluster properties.
+ * @throws Exception If failed.
+ */
+ public void start(Handler handler, ClusterProperties props) throws Exception {
+ if (server == null) {
+ server = new Server();
+
+ ServerConnector connector = new ServerConnector(server);
+
+ connector.setHost(props.httpServerHost());
+ connector.setPort(props.httpServerPort());
+ connector.setIdleTimeout(props.idleTimeout());
+
+ server.addConnector(connector);
+ server.setHandler(handler);
+
+ server.start();
+ }
+ else
+ throw new IllegalStateException("Jetty server has already been started.");
+ }
+
+ /**
+ * Stops server.
+ *
+ * @throws Exception If failed.
+ */
+ public void stop() throws Exception {
+ if (server != null)
+ server.stop();
+ else
+ throw new IllegalStateException("Jetty server has not yet been started.");
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
new file mode 100644
index 0000000..51de26b
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.server.HttpOutput;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+/**
+ * HTTP controller which provides on slave resources.
+ */
+public class ResourceHandler extends AbstractHandler {
+ /** */
+ public static final String IGNITE_PREFIX = "/ignite/";
+
+ /** */
+ public static final String LIBS_PREFIX = "/libs/";
+
+ /** */
+ public static final String CONFIG_PREFIX = "/config/";
+
+ /** */
+ public static final String DEFAULT_CONFIG = CONFIG_PREFIX + "default/";
+
+ /** */
+ private String libsDir;
+
+ /** */
+ private String cfgPath;
+
+ /** */
+ private String igniteDir;
+
+ /**
+ * @param libsDir Directory with user's libs.
+ * @param cfgPath Path to config file.
+ * @param igniteDir Directory with ignites.
+ */
+ public ResourceHandler(String libsDir, String cfgPath, String igniteDir) {
+ this.libsDir = libsDir;
+ this.cfgPath = cfgPath;
+ this.igniteDir = igniteDir;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void handle(
+ String url,
+ Request request,
+ HttpServletRequest httpServletRequest,
+ HttpServletResponse response) throws IOException, ServletException {
+
+ String[] path = url.split("/");
+
+ String fileName = path[path.length - 1];
+
+ String servicePath = url.substring(0, url.length() - fileName.length());
+
+ switch (servicePath) {
+ case IGNITE_PREFIX:
+ handleRequest(response, "application/zip-archive", igniteDir + "/" + fileName);
+
+ request.setHandled(true);
+ break;
+
+ case LIBS_PREFIX:
+ handleRequest(response, "application/java-archive", libsDir + "/" + fileName);
+
+ request.setHandled(true);
+ break;
+
+ case CONFIG_PREFIX:
+ handleRequest(response, "application/xml", cfgPath);
+
+ request.setHandled(true);
+ break;
+
+ case DEFAULT_CONFIG:
+ handleRequest(response, "application/xml",
+ Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName),
+ fileName);
+
+ request.setHandled(true);
+ break;
+ }
+ }
+
+ /**
+ * @param response Http response.
+ * @param type Type.
+ * @param path Path to file.
+ * @throws IOException If failed.
+ */
+ private static void handleRequest(HttpServletResponse response, String type, String path) throws IOException {
+ Path path0 = Paths.get(path);
+
+ response.setContentType(type);
+ response.setHeader("Content-Disposition", "attachment; filename=\"" + path0.getFileName() + "\"");
+
+ try (HttpOutput out = (HttpOutput)response.getOutputStream()) {
+ out.sendContent(FileChannel.open(path0, StandardOpenOption.READ));
+ }
+ }
+
+ /**
+ * @param response Http response.
+ * @param type Type.
+ * @param stream Stream.
+ * @param attachmentName Attachment name.
+ * @throws IOException If failed.
+ */
+ private static void handleRequest(HttpServletResponse response, String type, InputStream stream,
+ String attachmentName) throws IOException {
+ response.setContentType(type);
+ response.setHeader("Content-Disposition", "attachment; filename=\"" + attachmentName + "\"");
+
+ try (HttpOutput out = (HttpOutput)response.getOutputStream()) {
+ out.sendContent(stream);
+ }
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
new file mode 100644
index 0000000..47fdede
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.ignite.mesos.ClusterProperties;
+
+import static org.apache.ignite.mesos.resource.ResourceHandler.CONFIG_PREFIX;
+import static org.apache.ignite.mesos.resource.ResourceHandler.DEFAULT_CONFIG;
+import static org.apache.ignite.mesos.resource.ResourceHandler.IGNITE_PREFIX;
+import static org.apache.ignite.mesos.resource.ResourceHandler.LIBS_PREFIX;
+
+/**
+ * Provides path to user's libs and config file.
+ */
+public class ResourceProvider {
+ /** */
+ private static final Logger log = Logger.getLogger(ResourceProvider.class.getSimpleName());
+
+ /** Ignite url. */
+ private String igniteUrl;
+
+ /** Resources. */
+ private Collection<String> libsUris;
+
+ /** Url config. */
+ private String cfgUrl;
+
+ /** Config name. */
+ private String cfgName;
+
+ /**
+ * @param props Cluster properties.
+ * @param provider Ignite provider.
+ * @param baseUrl Base url.
+ */
+ public void init(ClusterProperties props, IgniteProvider provider, String baseUrl) throws IOException {
+ if (props.ignitePackageUrl() == null && props.ignitePackagePath() == null) {
+ // Downloading ignite.
+ try {
+ igniteUrl = baseUrl + IGNITE_PREFIX + provider.getIgnite(props.igniteVer());
+ }
+ catch (Exception e) {
+ log.log(Level.SEVERE, "Failed to download Ignite [err={0}, ver={1}].\n" +
+ "If application working behind NAT or Intranet and does not have access to external resources " +
+ "then you can use IGNITE_PACKAGE_URL or IGNITE_PACKAGE_PATH property that allow to use local " +
+ "resources.",
+ new Object[]{e, props.igniteVer()});
+ }
+ }
+
+ if (props.ignitePackagePath() != null) {
+ Path ignitePackPath = Paths.get(props.ignitePackagePath());
+
+ if (Files.exists(ignitePackPath) && !Files.isDirectory(ignitePackPath)) {
+ try {
+ String fileName = provider.copyToWorkDir(props.ignitePackagePath());
+
+ assert fileName != null;
+
+ igniteUrl = baseUrl + IGNITE_PREFIX + fileName;
+ }
+ catch (Exception e) {
+ log.log(Level.SEVERE, "Failed to copy Ignite to working directory [err={0}, path={1}].",
+ new Object[] {e, props.ignitePackagePath()});
+
+ throw e;
+ }
+ }
+ else
+ throw new IllegalArgumentException("Failed to find a ignite archive by path: "
+ + props.ignitePackagePath());
+ }
+
+ // Find all jar files into user folder.
+ if (props.userLibs() != null && !props.userLibs().isEmpty()) {
+ File libsDir = new File(props.userLibs());
+
+ List<String> libs = new ArrayList<>();
+
+ if (libsDir.isDirectory()) {
+ File[] files = libsDir.listFiles();
+
+ if (files != null) {
+ for (File lib : files) {
+ if (lib.isFile() && lib.canRead() &&
+ (lib.getName().endsWith(".jar") || lib.getName().endsWith(".JAR")))
+ libs.add(baseUrl + LIBS_PREFIX + lib.getName());
+ }
+ }
+ }
+
+ libsUris = libs.isEmpty() ? null : libs;
+ }
+
+ // Set configuration url.
+ if (props.igniteCfg() != null) {
+ File cfg = new File(props.igniteCfg());
+
+ if (cfg.isFile() && cfg.canRead()) {
+ cfgUrl = baseUrl + CONFIG_PREFIX + cfg.getName();
+
+ cfgName = cfg.getName();
+ }
+ }
+ else {
+ cfgName = "ignite-default-config.xml";
+
+ cfgUrl = baseUrl + DEFAULT_CONFIG + cfgName;
+ }
+ }
+
+ /**
+ * @return Config name.
+ */
+ public String configName() {
+ return cfgName;
+ }
+
+ /**
+ * @return Ignite url.
+ */
+ public String igniteUrl() {
+ return igniteUrl;
+ }
+
+ /**
+ * @return Urls to user's libs.
+ */
+ public Collection<String> resourceUrl() {
+ return libsUris;
+ }
+
+ /**
+ * @return Url to config file.
+ */
+ public String igniteConfigUrl() {
+ return cfgUrl;
+ }
+}
diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/package-info.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/package-info.java
new file mode 100644
index 0000000..a90176d
--- /dev/null
+++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains classes provide access to resources.
+ */
+
+package org.apache.ignite.mesos.resource;
diff --git a/modules/mesos-ext/src/main/resources/ignite-default-config.xml b/modules/mesos-ext/src/main/resources/ignite-default-config.xml
new file mode 100644
index 0000000..2f26398
--- /dev/null
+++ b/modules/mesos-ext/src/main/resources/ignite-default-config.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"/>
+ </property>
+
+ <property name="joinTimeout" value="60000"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
diff --git a/modules/mesos-ext/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/mesos-ext/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
new file mode 100644
index 0000000..bab4b73
--- /dev/null
+++ b/modules/mesos-ext/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.mesos.IgniteSchedulerSelfTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Apache Mesos integration tests.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ IgniteSchedulerSelfTest.class
+})
+public class IgniteMesosTestSuite {
+}
diff --git a/modules/mesos-ext/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos-ext/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
new file mode 100644
index 0000000..b362bee
--- /dev/null
+++ b/modules/mesos-ext/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.ignite.mesos.resource.ResourceProvider;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.scheduler.Protos.OfferConstraints;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Scheduler tests.
+ */
+public class IgniteSchedulerSelfTest {
+ /** */
+ private IgniteScheduler scheduler;
+
+ /** */
+ @Before
+ public void setUp() throws Exception {
+ ClusterProperties clustProp = new ClusterProperties();
+
+ scheduler = new IgniteScheduler(clustProp, new ResourceProvider() {
+ @Override public String configName() {
+ return "config.xml";
+ }
+
+ @Override public String igniteUrl() {
+ return "ignite.jar";
+ }
+
+ @Override public String igniteConfigUrl() {
+ return "config.xml";
+ }
+
+ @Override public Collection<String> resourceUrl() {
+ return null;
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHostRegister() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+ DriverMock mock = new DriverMock();
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+ assertEquals(1, mock.launchedTask.size());
+
+ Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+ assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
+ assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeclineByCpu() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.cpus(2);
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+ assertEquals(1, mock.launchedTask.size());
+
+ Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+ assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
+ assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
+
+ mock.clear();
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ Protos.OfferID declinedOffer = mock.declinedOffer;
+
+ assertEquals(offer.getId(), declinedOffer);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeclineByMem() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.memory(512);
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+ assertEquals(1, mock.launchedTask.size());
+
+ Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+ assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
+ assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
+
+ mock.clear();
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ Protos.OfferID declinedOffer = mock.declinedOffer;
+
+ assertEquals(offer.getId(), declinedOffer);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeclineByMemCpu() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 1, 1024);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.cpus(4);
+ clustProp.memory(2000);
+
+ scheduler.setClusterProps(clustProp);
+
+ double totalMem = 0, totalCpu = 0;
+
+ for (int i = 0; i < 2; i++) {
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+ assertEquals(1, mock.launchedTask.size());
+
+ Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+ totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPU);
+ totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM);
+
+ mock.clear();
+ }
+
+ assertEquals(2.0, totalCpu, 0);
+ assertEquals(2000.0, totalMem, 0);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ Protos.OfferID declinedOffer = mock.declinedOffer;
+
+ assertEquals(offer.getId(), declinedOffer);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeclineByCpuMinRequirements() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.minCpuPerNode(12);
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.declinedOffer);
+
+ assertEquals(offer.getId(), mock.declinedOffer);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeclineByMemMinRequirements() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.minMemoryPerNode(15000);
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.declinedOffer);
+
+ assertEquals(offer.getId(), mock.declinedOffer);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHosthameConstraint() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.hostnameConstraint(Pattern.compile("hostname"));
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.declinedOffer);
+
+ assertEquals(offer.getId(), mock.declinedOffer);
+
+ offer = createOffer("hostnameAccept", 8, 10240);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+ assertEquals(1, mock.launchedTask.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPerNode() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 8, 1024);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.memoryPerNode(1024);
+ clustProp.cpusPerNode(2);
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+
+ Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+ assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0);
+ assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0);
+
+ mock.clear();
+
+ offer = createOffer("hostname", 1, 2048);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ assertNotNull(mock.declinedOffer);
+ assertEquals(offer.getId(), mock.declinedOffer);
+
+ mock.clear();
+
+ offer = createOffer("hostname", 4, 512);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ assertNotNull(mock.declinedOffer);
+ assertEquals(offer.getId(), mock.declinedOffer);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIgniteFramework() throws Exception {
+ final String mesosUserValue = "userAAAAA";
+ final String mesosRoleValue = "role1";
+
+ IgniteFramework igniteFramework = new IgniteFramework() {
+ @Override protected String getUser() {
+ return mesosUserValue;
+ }
+
+ @Override protected String getRole() {
+ return mesosRoleValue;
+ }
+ };
+
+ Protos.FrameworkInfo info = igniteFramework.getFrameworkInfo();
+
+ String actualUserValue = info.getUser();
+ String actualRoleValue = info.getRole();
+
+ assertEquals(actualUserValue, mesosUserValue);
+ assertEquals(actualRoleValue, mesosRoleValue);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMesosRoleValidation() throws Exception {
+ List<String> failedRoleValues = Arrays.asList("", ".", "..", "-testRole",
+ "test/Role", "test\\Role", "test Role", null);
+
+ for (String failedRoleValue : failedRoleValues)
+ assertFalse(IgniteFramework.isRoleValid(failedRoleValue));
+ }
+
+ /**
+ * @param resourceType Resource type.
+ * @return Value.
+ */
+ private Double resources(List<Protos.Resource> resources, String resourceType) {
+ for (Protos.Resource resource : resources) {
+ if (resource.getName().equals(resourceType))
+ return resource.getScalar().getValue();
+ }
+
+ return null;
+ }
+
+ /**
+ * @param hostname Hostname
+ * @param cpu Cpu count.
+ * @param mem Mem size.
+ * @return Offer.
+ */
+ private Protos.Offer createOffer(String hostname, double cpu, double mem) {
+ return Protos.Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("1"))
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("1"))
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("1"))
+ .setHostname(hostname)
+ .addResources(Protos.Resource.newBuilder()
+ .setType(Protos.Value.Type.SCALAR)
+ .setName(IgniteScheduler.CPU)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
+ .build())
+ .addResources(Protos.Resource.newBuilder()
+ .setType(Protos.Value.Type.SCALAR)
+ .setName(IgniteScheduler.MEM)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build())
+ .build())
+ .build();
+ }
+
+ /**
+ * No-op implementation.
+ */
+ public static class DriverMock implements SchedulerDriver {
+ /** */
+ Collection<Protos.TaskInfo> launchedTask;
+
+ /** */
+ Protos.OfferID declinedOffer;
+
+ /**
+ * Clears launched task.
+ */
+ public void clear() {
+ launchedTask = null;
+ declinedOffer = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status start() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status stop(boolean failover) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status stop() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status abort() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status join() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status run() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status requestResources(Collection<Protos.Request> requests) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
+ Collection<Protos.TaskInfo> tasks, Protos.Filters filters) {
+ launchedTask = tasks;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
+ Collection<Protos.TaskInfo> tasks) {
+ launchedTask = tasks;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks,
+ Protos.Filters filters) {
+ launchedTask = tasks;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks) {
+ launchedTask = tasks;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status killTask(Protos.TaskID taskId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status acceptOffers(Collection<Protos.OfferID> collection,
+ Collection<Protos.Offer.Operation> collection1, Protos.Filters filters) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status declineOffer(Protos.OfferID offerId, Protos.Filters filters) {
+ declinedOffer = offerId;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status declineOffer(Protos.OfferID offerId) {
+ declinedOffer = offerId;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status reviveOffers() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status reviveOffers(Collection<String> collection) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status suppressOffers() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status suppressOffers(Collection<String> collection) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status acknowledgeStatusUpdate(Protos.TaskStatus status) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status sendFrameworkMessage(Protos.ExecutorID executorId, Protos.SlaveID slaveId,
+ byte[] data) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status reconcileTasks(Collection<Protos.TaskStatus> statuses) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status updateFramework(Protos.FrameworkInfo frameworkInfo, Collection<String> collection,
+ OfferConstraints offerConstraints) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status updateFramework(Protos.FrameworkInfo frameworkInfo,
+ Collection<String> collection) {
+ return null;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7d96020..721cde1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
<module>modules/geospatial-ext</module>
<module>modules/aop-ext</module>
<module>modules/spark-ext</module>
+ <module>modules/mesos-ext</module>
</modules>
<profiles>