blob: 1b39f2b26d4956b0f79c1081ce94edf4c888a93d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.compute.messaging;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.compute.ComputeUtils.cancelFromJobCancelResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.changePriorityFromJobChangePriorityResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.jobIdFromExecuteResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.resultFromJobResultResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.statusFromJobStatusResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.statusesFromJobStatusesResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.toDeploymentUnit;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Compute.CANCELLING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Compute.CHANGE_JOB_PRIORITY_ERR;
import static org.apache.ignite.lang.ErrorGroups.Compute.FAIL_TO_GET_JOB_STATUS_ERR;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.ComputeMessageTypes;
import org.apache.ignite.internal.compute.ComputeMessagesFactory;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionManager;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobStarter;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.compute.message.JobCancelRequest;
import org.apache.ignite.internal.compute.message.JobCancelResponse;
import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusRequest;
import org.apache.ignite.internal.compute.message.JobStatusResponse;
import org.apache.ignite.internal.compute.message.JobStatusesRequest;
import org.apache.ignite.internal.compute.message.JobStatusesResponse;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
/**
* Compute API internal messaging service.
*/
public class ComputeMessaging {
private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
private final ExecutionManager executionManager;
private final MessagingService messagingService;
private final TopologyService topologyService;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/**
* Constructor.
*
* @param executionManager Execution manager.
* @param messagingService Messaging service.
* @param topologyService Topology service.
*/
public ComputeMessaging(ExecutionManager executionManager, MessagingService messagingService, TopologyService topologyService) {
this.executionManager = executionManager;
this.messagingService = messagingService;
this.topologyService = topologyService;
}
/**
* Start messaging service.
*/
public void start(JobStarter starter) {
messagingService.addMessageHandler(ComputeMessageTypes.class, (message, sender, correlationId) -> {
assert correlationId != null;
if (!busyLock.enterBusy()) {
sendException(
message,
sender,
requireNonNull(correlationId, "correlationId is null"),
new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException())
);
return;
}
try {
processRequest(message, sender, requireNonNull(correlationId), starter);
} finally {
busyLock.leaveBusy();
}
});
}
private void sendException(NetworkMessage message, ClusterNode sender, long correlationId, IgniteInternalException ex) {
if (message instanceof ExecuteRequest) {
sendExecuteResponse(null, ex, sender, correlationId);
} else if (message instanceof JobResultRequest) {
sendJobResultResponse(null, ex, sender, correlationId);
} else if (message instanceof JobStatusesRequest) {
sendJobStatusesResponse(null, ex, sender, correlationId);
} else if (message instanceof JobStatusRequest) {
sendJobStatusResponse(null, ex, sender, correlationId);
} else if (message instanceof JobCancelRequest) {
sendJobCancelResponse(null, ex, sender, correlationId);
} else if (message instanceof JobChangePriorityRequest) {
sendJobChangePriorityResponse(null, ex, sender, correlationId);
}
}
private void processRequest(NetworkMessage message, ClusterNode sender, long correlationId, JobStarter starter) {
if (message instanceof ExecuteRequest) {
processExecuteRequest(starter, (ExecuteRequest) message, sender, correlationId);
} else if (message instanceof JobResultRequest) {
processJobResultRequest((JobResultRequest) message, sender, correlationId);
} else if (message instanceof JobStatusesRequest) {
processJobStatusesRequest((JobStatusesRequest) message, sender, correlationId);
} else if (message instanceof JobStatusRequest) {
processJobStatusRequest((JobStatusRequest) message, sender, correlationId);
} else if (message instanceof JobCancelRequest) {
processJobCancelRequest((JobCancelRequest) message, sender, correlationId);
} else if (message instanceof JobChangePriorityRequest) {
processJobChangePriorityRequest((JobChangePriorityRequest) message, sender, correlationId);
}
}
/**
* Stop messaging service. After stop this service is not usable anymore.
*/
public void stop() {
busyLock.block();
}
/**
* Submit Compute job to execution on remote node.
*
* @param options Job execution options.
* @param remoteNode The job will be executed on this node.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
* @return Job id future that will be completed when the job is submitted on the remote node.
*/
public CompletableFuture<UUID> remoteExecuteRequestAsync(
ExecutionOptions options,
ClusterNode remoteNode,
List<DeploymentUnit> units,
String jobClassName,
Object[] args
) {
List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
.map(ComputeUtils::toDeploymentUnitMsg)
.collect(toList());
ExecuteRequest executeRequest = messagesFactory.executeRequest()
.executeOptions(options)
.deploymentUnits(deploymentUnitMsgs)
.jobClassName(jobClassName)
.args(args)
.build();
return messagingService.invoke(remoteNode, executeRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(networkMessage -> jobIdFromExecuteResponse((ExecuteResponse) networkMessage));
}
private void processExecuteRequest(JobStarter starter, ExecuteRequest request, ClusterNode sender, long correlationId) {
List<DeploymentUnit> units = toDeploymentUnit(request.deploymentUnits());
JobExecution<Object> execution = starter.start(request.executeOptions(), units, request.jobClassName(), request.args());
execution.idAsync().whenComplete((jobId, err) -> sendExecuteResponse(jobId, err, sender, correlationId));
}
private void sendExecuteResponse(@Nullable UUID jobId, @Nullable Throwable ex, ClusterNode sender, Long correlationId) {
ExecuteResponse executeResponse = messagesFactory.executeResponse()
.jobId(jobId)
.throwable(ex)
.build();
messagingService.respond(sender, executeResponse, correlationId);
}
/**
* Gets compute job execution result from the remote node.
*
* @param remoteNode The job will be executed on this node.
* @param jobId Job id.
* @param <R> Job result type
* @return Job result.
*/
public <R> CompletableFuture<R> remoteJobResultRequestAsync(ClusterNode remoteNode, UUID jobId) {
JobResultRequest jobResultRequest = messagesFactory.jobResultRequest()
.jobId(jobId)
.build();
return messagingService.invoke(remoteNode, jobResultRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(networkMessage -> resultFromJobResultResponse((JobResultResponse) networkMessage));
}
private void processJobResultRequest(JobResultRequest request, ClusterNode sender, long correlationId) {
executionManager.resultAsync(request.jobId())
.whenComplete((result, err) -> sendJobResultResponse(result, err, sender, correlationId));
}
private void sendJobResultResponse(@Nullable Object result, @Nullable Throwable ex, ClusterNode sender, long correlationId) {
JobResultResponse jobResultResponse = messagesFactory.jobResultResponse()
.result(result)
.throwable(ex)
.build();
messagingService.respond(sender, jobResultResponse, correlationId);
}
CompletableFuture<Collection<JobStatus>> remoteStatusesAsync(ClusterNode remoteNode) {
JobStatusesRequest jobStatusRequest = messagesFactory.jobStatusesRequest()
.build();
return messagingService.invoke(remoteNode, jobStatusRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(networkMessage -> statusesFromJobStatusesResponse((JobStatusesResponse) networkMessage));
}
private void processJobStatusesRequest(JobStatusesRequest message, ClusterNode sender, long correlationId) {
executionManager.localStatusesAsync()
.whenComplete((statuses, throwable) -> sendJobStatusesResponse(statuses, throwable, sender, correlationId));
}
private void sendJobStatusesResponse(
@Nullable Collection<JobStatus> statuses,
@Nullable Throwable throwable,
ClusterNode sender,
Long correlationId
) {
JobStatusesResponse jobStatusResponse = messagesFactory.jobStatusesResponse()
.statuses(statuses)
.throwable(throwable)
.build();
messagingService.respond(sender, jobStatusResponse, correlationId);
}
/**
* Gets compute job status from the remote node.
*
* @param remoteNode The job will be executed on this node.
* @param jobId Compute job id.
* @return The current status of the job, or {@code null} if there's no job with the specified id.
*/
CompletableFuture<@Nullable JobStatus> remoteStatusAsync(ClusterNode remoteNode, UUID jobId) {
JobStatusRequest jobStatusRequest = messagesFactory.jobStatusRequest()
.jobId(jobId)
.build();
return messagingService.invoke(remoteNode, jobStatusRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(networkMessage -> statusFromJobStatusResponse((JobStatusResponse) networkMessage));
}
private void processJobStatusRequest(JobStatusRequest request, ClusterNode sender, long correlationId) {
executionManager.statusAsync(request.jobId())
.whenComplete((status, throwable) -> sendJobStatusResponse(status, throwable, sender, correlationId));
}
private void sendJobStatusResponse(@Nullable JobStatus status, @Nullable Throwable throwable, ClusterNode sender, Long correlationId) {
JobStatusResponse jobStatusResponse = messagesFactory.jobStatusResponse()
.status(status)
.throwable(throwable)
.build();
messagingService.respond(sender, jobStatusResponse, correlationId);
}
/**
* Cancels compute job on the remote node.
*
* @param remoteNode The job will be canceled on this node.
* @param jobId Compute job id.
* @return The future which will be completed with {@code true} when the job is cancelled, {@code false} when the job couldn't be
* cancelled (either it's not yet started, or it's already completed), or {@code null} if there's no job with the specified id.
*/
CompletableFuture<@Nullable Boolean> remoteCancelAsync(ClusterNode remoteNode, UUID jobId) {
JobCancelRequest jobCancelRequest = messagesFactory.jobCancelRequest()
.jobId(jobId)
.build();
return messagingService.invoke(remoteNode, jobCancelRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(networkMessage -> cancelFromJobCancelResponse((JobCancelResponse) networkMessage));
}
private void processJobCancelRequest(JobCancelRequest request, ClusterNode sender, long correlationId) {
executionManager.cancelAsync(request.jobId())
.whenComplete((result, err) -> sendJobCancelResponse(result, err, sender, correlationId));
}
private void sendJobCancelResponse(@Nullable Boolean result, @Nullable Throwable throwable, ClusterNode sender, Long correlationId) {
JobCancelResponse jobCancelResponse = messagesFactory.jobCancelResponse()
.result(result)
.throwable(throwable)
.build();
messagingService.respond(sender, jobCancelResponse, correlationId);
}
/**
* Changes compute job priority on the remote node.
*
* @param remoteNode The priority of the job will be changed on this node.
* @param jobId Compute job id.
* @param newPriority new job priority.
*
* @return Job change priority future (will be completed when change priority request is processed).
*/
CompletableFuture<@Nullable Boolean> remoteChangePriorityAsync(ClusterNode remoteNode, UUID jobId, int newPriority) {
JobChangePriorityRequest jobChangePriorityRequest = messagesFactory.jobChangePriorityRequest()
.jobId(jobId)
.priority(newPriority)
.build();
return messagingService.invoke(remoteNode, jobChangePriorityRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(networkMessage -> changePriorityFromJobChangePriorityResponse((JobChangePriorityResponse) networkMessage));
}
private void processJobChangePriorityRequest(JobChangePriorityRequest request, ClusterNode sender, long correlationId) {
executionManager.changePriorityAsync(request.jobId(), request.priority())
.whenComplete((result, err) -> sendJobChangePriorityResponse(result, err, sender, correlationId));
}
private void sendJobChangePriorityResponse(
@Nullable Boolean result,
@Nullable Throwable throwable,
ClusterNode sender,
Long correlationId
) {
JobChangePriorityResponse jobChangePriorityResponse = messagesFactory.jobChangePriorityResponse()
.throwable(throwable)
.result(result)
.build();
messagingService.respond(sender, jobChangePriorityResponse, correlationId);
}
/**
* Broadcasts job statuses request to all nodes in the cluster.
*
* @return The future which will be completed with the collection of statuses from all nodes.
*/
public CompletableFuture<Collection<JobStatus>> broadcastStatusesAsync() {
return broadcastAsyncAndCollect(
node -> remoteStatusesAsync(node),
throwable -> new ComputeException(
FAIL_TO_GET_JOB_STATUS_ERR,
"Failed to retrieve statuses",
throwable
)).thenApply(statuses -> {
return statuses.stream()
.flatMap(Collection::stream)
.filter(Objects::nonNull)
.collect(toList());
});
}
/**
* Broadcasts job status request to all nodes in the cluster.
*
* @param jobId Job id.
* @return The current status of the job, or {@code null} if the job status no longer exists due to exceeding the retention time limit.
*/
public CompletableFuture<@Nullable JobStatus> broadcastStatusAsync(UUID jobId) {
return broadcastAsync(
node -> remoteStatusAsync(node, jobId),
throwable -> new ComputeException(
FAIL_TO_GET_JOB_STATUS_ERR,
"Failed to retrieve status of the job with ID: " + jobId,
throwable
));
}
/**
* Broadcasts job cancel request to all nodes in the cluster.
*
* @param jobId Job id.
* @return The future which will be completed with {@code true} when the job is cancelled, {@code false} when the job couldn't be
* cancelled (either it's not yet started, or it's already completed), or {@code null} if there's no job with the specified id.
*/
public CompletableFuture<@Nullable Boolean> broadcastCancelAsync(UUID jobId) {
return broadcastAsync(
node -> remoteCancelAsync(node, jobId),
throwable -> new ComputeException(
CANCELLING_ERR,
"Failed to cancel job with ID: " + jobId,
throwable
));
}
/**
* Broadcasts job priority change request to all nodes in the cluster.
*
* @param jobId Job id.
* @param newPriority New priority.
* @return The future which will be completed with {@code true} when the priority is changed, {@code false} when the priority couldn't
* be changed (it's already executing or completed), or {@code null} if there's no job with the specified id.
*/
public CompletableFuture<@Nullable Boolean> broadcastChangePriorityAsync(UUID jobId, int newPriority) {
return broadcastAsync(
node -> remoteChangePriorityAsync(node, jobId, newPriority),
throwable -> new ComputeException(
CHANGE_JOB_PRIORITY_ERR,
"Failed to change priority for job with ID: " + jobId,
throwable
));
}
/**
* Broadcasts a request to all nodes in the cluster.
*
* @param request Function which maps a node to the request future.
* @param error Function which creates a specific error from the exception thrown from the request.
* @return The future which will be completed when request is processed.
*/
private <R> CompletableFuture<@Nullable R> broadcastAsync(
Function<ClusterNode, CompletableFuture<@Nullable R>> request,
Function<Throwable, Throwable> error
) {
CompletableFuture<@Nullable R> result = new CompletableFuture<>();
ClusterNode localMember = topologyService.localMember();
CompletableFuture<?>[] futures = topologyService.allMembers()
.stream()
.filter(node -> !node.equals(localMember))
.map(node -> request.apply(node)
.thenAccept(response -> {
if (response != null) {
result.complete(response);
}
}))
.toArray(CompletableFuture[]::new);
allOf(futures).whenComplete((unused, throwable) -> {
// If none of the nodes returned non-null status it means that either we couldn't find the status
// or the node which had the status thrown an exception. If any of the futures completed exceptionally
// but the result is non-null, then ignore the exceptions from other futures.
if (!result.isDone()) {
// allOf will complete exceptionally if any of the futures failed, so this condition means that we
// successfully couldn't find a status.
if (throwable == null) {
result.complete(null);
return;
}
result.completeExceptionally(error.apply(throwable));
}
});
return result;
}
private <R> CompletableFuture<List<R>> broadcastAsyncAndCollect(
Function<ClusterNode, CompletableFuture<@Nullable R>> request,
Function<Throwable, RuntimeException> error
) {
CompletableFuture<R>[] futures = topologyService.allMembers()
.stream()
.map(request::apply)
.toArray(CompletableFuture[]::new);
return CompletableFutures.allOf(futures).exceptionally(throwable -> {
throw error.apply(throwable);
});
}
}