| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.shardingsphere.elasticjob.lite.internal.schedule; |
| |
| import com.google.common.base.Strings; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.shardingsphere.elasticjob.api.JobConfiguration; |
| import org.apache.shardingsphere.elasticjob.executor.JobFacade; |
| import org.apache.shardingsphere.elasticjob.infra.context.TaskContext; |
| import org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException; |
| import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener; |
| import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts; |
| import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService; |
| import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverService; |
| import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionContextService; |
| import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService; |
| import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService; |
| import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; |
| import org.apache.shardingsphere.elasticjob.tracing.JobTracingEventBus; |
| import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; |
| import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent; |
| import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent; |
| import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source; |
| import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State; |
| |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Lite job facade. |
| */ |
| @Slf4j |
| public final class LiteJobFacade implements JobFacade { |
| |
| private final ConfigurationService configService; |
| |
| private final ShardingService shardingService; |
| |
| private final ExecutionContextService executionContextService; |
| |
| private final ExecutionService executionService; |
| |
| private final FailoverService failoverService; |
| |
| private final Collection<ElasticJobListener> elasticJobListeners; |
| |
| private final JobTracingEventBus jobTracingEventBus; |
| |
| public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners, final TracingConfiguration<?> tracingConfig) { |
| configService = new ConfigurationService(regCenter, jobName); |
| shardingService = new ShardingService(regCenter, jobName); |
| executionContextService = new ExecutionContextService(regCenter, jobName); |
| executionService = new ExecutionService(regCenter, jobName); |
| failoverService = new FailoverService(regCenter, jobName); |
| this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList()); |
| this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig); |
| } |
| |
| @Override |
| public JobConfiguration loadJobConfiguration(final boolean fromCache) { |
| return configService.load(fromCache); |
| } |
| |
| @Override |
| public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException { |
| configService.checkMaxTimeDiffSecondsTolerable(); |
| } |
| |
| @Override |
| public void failoverIfNecessary() { |
| if (configService.load(true).isFailover()) { |
| failoverService.failoverIfNecessary(); |
| } |
| } |
| |
| @Override |
| public void registerJobBegin(final ShardingContexts shardingContexts) { |
| executionService.registerJobBegin(shardingContexts); |
| } |
| |
| @Override |
| public void registerJobCompleted(final ShardingContexts shardingContexts) { |
| executionService.registerJobCompleted(shardingContexts); |
| if (configService.load(true).isFailover()) { |
| failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); |
| } |
| } |
| |
| @Override |
| public ShardingContexts getShardingContexts() { |
| boolean isFailover = configService.load(true).isFailover(); |
| if (isFailover) { |
| List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems(); |
| if (!failoverShardingItems.isEmpty()) { |
| return executionContextService.getJobShardingContext(failoverShardingItems); |
| } |
| } |
| shardingService.shardingIfNecessary(); |
| List<Integer> shardingItems = shardingService.getLocalShardingItems(); |
| if (isFailover) { |
| shardingItems.removeAll(failoverService.getLocalTakeOffItems()); |
| } |
| shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); |
| return executionContextService.getJobShardingContext(shardingItems); |
| } |
| |
| @Override |
| public boolean misfireIfRunning(final Collection<Integer> shardingItems) { |
| return executionService.misfireIfHasRunningItems(shardingItems); |
| } |
| |
| @Override |
| public void clearMisfire(final Collection<Integer> shardingItems) { |
| executionService.clearMisfire(shardingItems); |
| } |
| |
| @Override |
| public boolean isExecuteMisfired(final Collection<Integer> shardingItems) { |
| return configService.load(true).isMisfire() && !isNeedSharding() && !executionService.getMisfiredJobItems(shardingItems).isEmpty(); |
| } |
| |
| @Override |
| public boolean isNeedSharding() { |
| return shardingService.isNeedSharding(); |
| } |
| |
| @Override |
| public void beforeJobExecuted(final ShardingContexts shardingContexts) { |
| for (ElasticJobListener each : elasticJobListeners) { |
| each.beforeJobExecuted(shardingContexts); |
| } |
| } |
| |
| @Override |
| public void afterJobExecuted(final ShardingContexts shardingContexts) { |
| for (ElasticJobListener each : elasticJobListeners) { |
| each.afterJobExecuted(shardingContexts); |
| } |
| } |
| |
| @Override |
| public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { |
| jobTracingEventBus.post(jobExecutionEvent); |
| } |
| |
| @Override |
| public void postJobStatusTraceEvent(final String taskId, final State state, final String message) { |
| TaskContext taskContext = TaskContext.from(taskId); |
| jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), |
| taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType().name(), taskContext.getMetaInfo().getShardingItems().toString(), state, message)); |
| if (!Strings.isNullOrEmpty(message)) { |
| log.trace(message); |
| } |
| } |
| } |