| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| = Distributed Computing |
| |
| Ignite 3 provides an API for distributing computations across cluster nodes in a balanced and fault-tolerant manner. You can submit individual tasks for execution from Java and .NET clients. |
| |
| You can use Java or .NET client to execute compute jobs. Make sure the required classes are deployed to the cluster before executing code. |
| |
| The example below assumes that the `NodeNameJob` class has been deployed to the node by using link:developers-guide/code-deployment[code deployment]. |
| |
| //== Synchronous Computation |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source, java] |
| ---- |
| private void example() { |
| IgniteClient client = client(); |
| IgniteCompute compute = client.compute(); |
| Set<ClusterNode> nodes = new HashSet<>(client.clusterNodes()); |
| |
| //Unit `unitName:1.1.1` contains NodeNameJob class. |
| List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")); |
| |
| JobExecution<String> execution = compute.executeAsync(nodes, units, NodeNameJob.class, "Hello"); |
| var result = execution.resultAsync() |
| } |
| ---- |
| |
| |
| NOTE: Unlike in Ignite 2, jobs are not serialized. Only the class name and arguments are sent to the node. |
| |
| tab:.NET[] |
| [source, csharp] |
| ---- |
| IIgniteClient client = Client; |
| ICompute compute = client.Compute; |
| IList<IClusterNode> nodes = await Client.GetClusterNodesAsync(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") }; |
| |
| IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, JobExecutionOptions.Default, "Hello"); |
| string result = await execution.GetResultAsync(); |
| ---- |
| |
| tab:C++[] |
| [source, cpp] |
| ---- |
| using namespace ignite; |
| |
| compute comp = client.get_compute(); |
| std::vector<cluster_node> nodes = client.get_nodes(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}}; |
| |
| job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, {}); |
| std::string result = execution.get_result()->get<std::string>(); |
| ---- |
| -- |
| |
| //== Asynchronous Computation |
| // Placeholder for when API is complete |
| |
| == Job Ownership |
| |
| If the cluster has link:security/authentication[Authentication] enabled, compute jobs are executed by a specific user. If user permissions are configured on the cluster, the user needs the appropriate link:security/permissions#distributed-computing[distributed computing permissions] to work with distributed computing jobs. Only users with `JOBS_ADMIN` action can interact with jobs of other users. |
| |
| == Job Execution States |
| |
| You can keep track of the status of the job on the server and react to status changes. For example: |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source, java] |
| ---- |
| private void example() { |
| IgniteClient client = client(); |
| IgniteCompute compute = client.compute(); |
| Set<ClusterNode> nodes = new HashSet<>(client.clusterNodes()); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")); |
| |
| JobExecution<String> execution = compute.executeAsync(nodes, units, NodeNameJob.class, "Hello"); |
| |
| execution.statusAsync().thenApply(status -> { |
| if (status == "Failed") { |
| // Handle failure |
| } |
| }); |
| |
| var result = execution.resultAsync() |
| } |
| ---- |
| |
| tab:.NET[] |
| [source, csharp] |
| ---- |
| IIgniteClient client = Client; |
| ICompute compute = client.Compute; |
| IList<IClusterNode> nodes = await Client.GetClusterNodesAsync(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") }; |
| |
| IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, JobExecutionOptions.Default, "Hello"); |
| |
| JobStatus? status = await execution.GetStatusAsync(); |
| |
| if (status?.State == JobState.Failed) |
| { |
| // Handle failure |
| } |
| |
| string result = await execution.GetResultAsync(); |
| ---- |
| |
| tab:C++[] |
| [source, cpp] |
| ---- |
| using namespace ignite; |
| |
| compute comp = client.get_compute(); |
| std::vector<cluster_node> nodes = client.get_nodes(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}}; |
| |
| job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, {}); |
| |
| std::optional<job_status> status = execution.get_status(); |
| if (status && status->state == job_state::FAILED) |
| { |
| // Handle failure |
| } |
| std::string result = execution.get_result()->get<std::string>(); |
| ---- |
| -- |
| |
| The table below lists the possible job statuses: |
| |
| [width="100%",cols="20%,60%,20%"] |
| |======================================================================= |
| |Status |Description |Transitions to |
| |
| | `Submitted` | The job was created and sent to the cluster, but not yet processed. | `Queued`, `Canceled` |
| | `Queued` | The job was added to the queue and waiting queue for execution. | `Executing`, `Canceled` |
| | `Executing` | The job is being executed. | `Canceling`, `Completed`, `Queued` |
| | `Completed` | The job was executed successfully and the execution result was returned. | |
| | `Failed` | The job was unexpectedly terminated during execution. | `Queued` |
| | `Canceling` | Job has received the cancel command, but is still running. | `Completed`, `Canceled` |
| | `Canceled` | Job was successfully cancelled. | |
| |
| |======================================================================= |
| |
| If all job execution threads are busy, new jobs received by the node are put into job queue according to their <<Job Priority>>. Ignite sorts all incoming jobs first by priority, then by the time, executing jobs queued earlier first. |
| |
| === Cancelling Executing Jobs |
| |
| When the node receives the command to cancel the job in the `Executing` status, it will immediately send an interrupt to the thread that is responsible for the job. In most cases, this will lead to the job being immediately canceled, however there are cases in which the job will continue. If this happens, the job will be in the `Canceling` state. Depending on specific code being executed, the job may complete successfully, be canceled once the uninterruptible operation is finished, or remain in unfinished state (for example, if code is stuck in a loop). You can use the `JobExecution.statusAsync()` method to keep track of what status the job is in, and react to status change. |
| |
| |
| == Job Configuration |
| |
| === Job Priority |
| |
| You can specify a job priority by setting the `JobExecutionOptions.priority` property. Jobs with a higher priority will be queued before jobs with lower priority (for exammple, a job with priority 4 will be executed before the job with priority 2). |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source, java] |
| ---- |
| private void example() { |
| IgniteClient client = client(); |
| IgniteCompute compute = client.compute(); |
| Set<ClusterNode> nodes = new HashSet<>(client.clusterNodes()); |
| |
| //Unit `unitName:1.1.1` contains NodeNameJob class. |
| List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")); |
| |
| // Create job execution options |
| JobExecutionOptions options = JobExecutionOptions.builder().priority(1).build(); |
| |
| JobExecution<String> execution = compute.executeAsync(nodes, units, NodeNameJob.class, options, "Hello"); |
| var result = execution.resultAsync() |
| } |
| ---- |
| |
| tab:.NET[] |
| [source, csharp] |
| ---- |
| IIgniteClient client = Client; |
| ICompute compute = client.Compute; |
| IList<IClusterNode> nodes = await Client.GetClusterNodesAsync(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") }; |
| |
| // Create job execution options |
| var options = JobExecutionOptions.Default with { Priority = 1 }; |
| |
| IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, options, "Hello"); |
| string result = await execution.GetResultAsync(); |
| ---- |
| |
| tab:C++[] |
| [source, cpp] |
| ---- |
| using namespace ignite; |
| |
| compute comp = client.get_compute(); |
| std::vector<cluster_node> nodes = client.get_nodes(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}}; |
| |
| job_execution_options options{1, 0}; |
| job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options)); |
| std::string result = execution.get_result()->get<std::string>(); |
| ---- |
| -- |
| |
| === Job Retries |
| |
| You can set the number the job will be retried on failure by setting the `JobExecutionOptions.maxRetries` property. If set, the failed job will be retried the specified number of times before movbing to `Failed` state. |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source, java] |
| ---- |
| private void example() { |
| IgniteClient client = client(); |
| IgniteCompute compute = client.compute(); |
| Set<ClusterNode> nodes = new HashSet<>(client.clusterNodes()); |
| |
| //Unit `unitName:1.1.1` contains NodeNameJob class. |
| List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")); |
| |
| // Create job execution options |
| JobExecutionOptions options = JobExecutionOptions.builder().maxRetries(5).build(); |
| |
| JobExecution<String> execution = compute.executeAsync(nodes, units, NodeNameJob.class, options, "Hello"); |
| var result = execution.resultAsync() |
| } |
| ---- |
| |
| tab:.NET[] |
| [source, csharp] |
| ---- |
| IIgniteClient client = Client; |
| ICompute compute = client.Compute; |
| IList<IClusterNode> nodes = await Client.GetClusterNodesAsync(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") }; |
| |
| // Create job execution options |
| var options = JobExecutionOptions.Default with { MaxRetries = 5 }; |
| |
| IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, options, "Hello"); |
| string result = await execution.GetResultAsync(); |
| ---- |
| |
| tab:C++[] |
| [source, cpp] |
| ---- |
| using namespace ignite; |
| |
| compute comp = client.get_compute(); |
| std::vector<cluster_node> nodes = client.get_nodes(); |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}}; |
| |
| job_execution_options options{0, 5}; |
| job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options)); |
| std::string result = execution.get_result()->get<std::string>(); |
| ---- |
| -- |
| |
| == Job Failover |
| |
| Ignite 3 implements mechanics to handle issues that happen during job execution. The following situations are handled: |
| |
| === Worker Node Shutdown |
| |
| If the [.tooltip]#worker node# is shut down, the [.tooltip]#coordinator node# will redistribute all jobs assigned to worker to other viable nodes. If no nodes are found, the job will fail and an exception will be sent to the client. |
| |
| === Coordinator Node Shutdown |
| |
| If the coordinator node shuts down, all jobs will be cancelled as soon as the node detects that the coordinator is shut down. Note that link:compute/compute#cancelling-executing-jobs[some jobs] may take a long time to cancel. |
| |
| === Client Disconnect |
| |
| If the client disconnects, all jobs will be cancelled as soon as the coordinator node detects the disconnect. Note that link:compute/compute#cancelling-executing-jobs[some jobs] may take a long time to cancel. |
| |
| == Colocated Computations |
| |
| In Ignite 3 you can execute colocated computation with `executeColocated` method. When you do it, the compute task is guaranteed to be executed on the nodes that hold the specified key. This can significantly reduce execution time if your tasks require data. |
| |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source, java] |
| ---- |
| private void example() { |
| IgniteClient client = client(); |
| IgniteCompute compute = client.compute(); |
| String table = "Person"; |
| String key = "John"; |
| |
| |
| //Unit `unitName:1.1.1` contains NodeNameJob class. |
| List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")); |
| |
| JobExecution<String> execution = compute.executeColocatedAsync(table, key, units, NodeNameJob.class, "Hello"); |
| String result = execution.resultAsync().join() |
| } |
| ---- |
| |
| tab:.NET[] |
| [source, csharp] |
| ---- |
| IIgniteClient client = Client; |
| ICompute compute = client.Compute; |
| string table = "Person"; |
| string key = "John"; |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") }; |
| |
| IJobExecution<string> execution = await compute.SubmitColocatedAsync<string, string>(table, key, units, NodeNameJob, "Hello"); |
| string result = await execution.GetResultAsync(); |
| ---- |
| tab:C++[] |
| [source, cpp] |
| ---- |
| using namespace ignite; |
| |
| compute comp = client.get_compute(); |
| std::string table{"Person"}; |
| std::string key{"John"}; |
| |
| // Unit `unitName:1.1.1` contains NodeNameJob class. |
| std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}}; |
| |
| job_execution execution = comp.submit_colocated(table, key, units, NODE_NAME_JOB, {std::string("Hello")}, {}); |
| std::string result = execution.get_result()->get<std::string>(); |
| ---- |
| -- |