blob: bcb1dd85924e2e32f1d1f14c856fa17b7c862880 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
public class ClientJobProxy implements Job {
private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private final Long jobId;
private JobResult jobResult;
public ClientJobProxy(
@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
@NonNull JobImmutableInformation jobImmutableInformation) {
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobId = jobImmutableInformation.getJobId();
submitJob(jobImmutableInformation);
}
public ClientJobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient, Long jobId) {
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobId = jobId;
}
@Override
public long getJobId() {
return jobId;
}
private void submitJob(JobImmutableInformation jobImmutableInformation) {
LOGGER.info(
String.format(
"Start submit job, job id: %s, with plugin jar %s",
jobImmutableInformation.getJobId(),
jobImmutableInformation.getPluginJarsUrls()));
ClientMessage request =
SeaTunnelSubmitJobCodec.encodeRequest(
jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient
.getSerializationService()
.toData(jobImmutableInformation));
PassiveCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
submitJobFuture.join();
LOGGER.info(
String.format(
"Submit job finished, job id: %s, job name: %s",
jobImmutableInformation.getJobId(), jobImmutableInformation.getJobName()));
}
/**
* This method will block even the Job turn to a EndState
*
* @return The job final status
*/
@Override
public JobResult waitForJobCompleteV2() {
try {
jobResult =
RetryUtils.retryWithException(
() -> {
PassiveCompletableFuture<JobResult> jobFuture =
doWaitForJobComplete();
return jobFuture.get();
},
new RetryUtils.RetryMaterial(
100000,
true,
ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
if (jobResult == null) {
throw new SeaTunnelEngineException("failed to fetch job result");
}
} catch (Exception e) {
LOGGER.severe(
String.format(
"Job (%s) end with unknown state, and throw Exception: %s",
jobId, ExceptionUtils.getMessage(e)));
throw new RuntimeException(e);
}
LOGGER.info(String.format("Job (%s) end with state %s", jobId, jobResult.getStatus()));
return jobResult;
}
public JobResult getJobResultCache() {
return jobResult;
}
@Override
public PassiveCompletableFuture<JobResult> doWaitForJobComplete() {
return new PassiveCompletableFuture<>(
seaTunnelHazelcastClient
.requestOnMasterAndGetCompletableFuture(
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobId),
SeaTunnelWaitForJobCompleteCodec::decodeResponse)
.thenApply(
jobResult ->
seaTunnelHazelcastClient
.getSerializationService()
.toObject(jobResult)));
}
@Override
public void cancelJob() {
PassiveCompletableFuture<Void> cancelFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
SeaTunnelCancelJobCodec.encodeRequest(jobId));
cancelFuture.join();
}
@Override
public JobStatus getJobStatus() {
int jobStatusOrdinal =
seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelGetJobStatusCodec.encodeRequest(jobId),
SeaTunnelGetJobStatusCodec::decodeResponse);
return JobStatus.values()[jobStatusOrdinal];
}
}