blob: 3d5c9d79d03204c2521879f4c376342f8fc2a5ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.fakes;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
import static org.apache.ignite.compute.JobState.FAILED;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.Nullable;
/**
* Fake {@link IgniteCompute}.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class FakeCompute implements IgniteComputeInternal {
public static final String GET_UNITS = "get-units";
public static volatile @Nullable CompletableFuture future;
public static volatile @Nullable RuntimeException err;
private final Map<UUID, JobStatus> statuses = new ConcurrentHashMap<>();
public static volatile CountDownLatch latch = new CountDownLatch(0);
private final String nodeName;
public FakeCompute(String nodeName) {
this.nodeName = nodeName;
}
@Override
public <R> JobExecution<R> submit(
Set<ClusterNode> nodes,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args) {
return executeAsyncWithFailover(nodes, units, jobClassName, options, args);
}
@Override
public <R> JobExecution<R> executeAsyncWithFailover(
Set<ClusterNode> nodes,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args) {
if (Objects.equals(jobClassName, GET_UNITS)) {
String unitString = units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
return completedExecution((R) unitString);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (err != null) {
throw err;
}
return jobExecution(future != null ? future : completedFuture((R) nodeName));
}
/** {@inheritDoc} */
@Override
public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(TableViewInternal table, Tuple key, List<DeploymentUnit> units,
String jobClassName, JobExecutionOptions options, Object[] args) {
return completedFuture(jobExecution(future != null ? future : completedFuture((R) nodeName)));
}
/** {@inheritDoc} */
@Override
public <R> R execute(
Set<ClusterNode> nodes,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args
) {
return sync(executeAsync(nodes, units, jobClassName, options, args));
}
@Override
public <R> JobExecution<R> submitColocated(
String tableName,
Tuple key,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args
) {
return jobExecution(future != null ? future : completedFuture((R) nodeName));
}
@Override
public <K, R> JobExecution<R> submitColocated(
String tableName,
K key,
Mapper<K> keyMapper,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args
) {
return jobExecution(future != null ? future : completedFuture((R) nodeName));
}
/** {@inheritDoc} */
@Override
public <R> R executeColocated(
String tableName,
Tuple key,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args
) {
return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
}
/** {@inheritDoc} */
@Override
public <K, R> R executeColocated(
String tableName,
K key,
Mapper<K> keyMapper,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args
) {
return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
}
@Override
public <R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
Set<ClusterNode> nodes,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
Object... args
) {
return null;
}
@Override
public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
return null;
}
@Override
public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
return sync(executeMapReduceAsync(units, taskClassName, args));
}
private <R> JobExecution<R> completedExecution(R result) {
return jobExecution(completedFuture(result));
}
private <R> JobExecution<R> jobExecution(CompletableFuture<R> result) {
UUID jobId = UUID.randomUUID();
JobStatus status = JobStatus.builder()
.id(jobId)
.state(EXECUTING)
.createTime(Instant.now())
.startTime(Instant.now())
.build();
statuses.put(jobId, status);
result.whenComplete((r, throwable) -> {
JobState state = throwable != null ? FAILED : COMPLETED;
JobStatus newStatus = status.toBuilder().state(state).finishTime(Instant.now()).build();
statuses.put(jobId, newStatus);
});
return new JobExecution<>() {
@Override
public CompletableFuture<R> resultAsync() {
return result;
}
@Override
public CompletableFuture<@Nullable JobStatus> statusAsync() {
return completedFuture(statuses.get(jobId));
}
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync() {
return trueCompletedFuture();
}
@Override
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
return trueCompletedFuture();
}
};
}
@Override
public CompletableFuture<Collection<JobStatus>> statusesAsync() {
return completedFuture(statuses.values());
}
@Override
public CompletableFuture<@Nullable JobStatus> statusAsync(UUID jobId) {
return completedFuture(statuses.get(jobId));
}
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync(UUID jobId) {
return trueCompletedFuture();
}
@Override
public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID jobId, int newPriority) {
return trueCompletedFuture();
}
private static <R> R sync(CompletableFuture<R> future) {
try {
return future.join();
} catch (CompletionException e) {
throw ExceptionUtils.wrap(e);
}
}
}