blob: c7652cccaddcc040b5254a4a58a7cad44df0ec31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.util.json;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import org.apache.shardingsphere.elasticjob.cloud.api.JobType;
import org.apache.shardingsphere.elasticjob.cloud.config.JobTypeConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.script.ScriptJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.JobCoreConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.JobRootConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.dataflow.DataflowJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.config.simple.SimpleJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.executor.handler.JobProperties;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Job configuration gson type adapter.
*
* @param <T> type of job root configuration
*/
public abstract class AbstractJobConfigurationGsonTypeAdapter<T extends JobRootConfiguration> extends TypeAdapter<T> {
@Override
public T read(final JsonReader in) throws IOException {
String jobName = "";
String cron = "";
int shardingTotalCount = 0;
String shardingItemParameters = "";
String jobParameter = "";
boolean failover = false;
boolean misfire = failover;
String description = "";
JobProperties jobProperties = new JobProperties();
JobType jobType = null;
String jobClass = "";
boolean streamingProcess = false;
String scriptCommandLine = "";
Map<String, Object> customizedValueMap = new HashMap<>(32, 1);
in.beginObject();
while (in.hasNext()) {
String jsonName = in.nextName();
switch (jsonName) {
case "jobName":
jobName = in.nextString();
break;
case "cron":
cron = in.nextString();
break;
case "shardingTotalCount":
shardingTotalCount = in.nextInt();
break;
case "shardingItemParameters":
shardingItemParameters = in.nextString();
break;
case "jobParameter":
jobParameter = in.nextString();
break;
case "failover":
failover = in.nextBoolean();
break;
case "misfire":
misfire = in.nextBoolean();
break;
case "description":
description = in.nextString();
break;
case "jobProperties":
jobProperties = getJobProperties(in);
break;
case "jobType":
jobType = JobType.valueOf(in.nextString());
break;
case "jobClass":
jobClass = in.nextString();
break;
case "streamingProcess":
streamingProcess = in.nextBoolean();
break;
case "scriptCommandLine":
scriptCommandLine = in.nextString();
break;
default:
addToCustomizedValueMap(jsonName, in, customizedValueMap);
break;
}
}
in.endObject();
JobCoreConfiguration coreConfig = getJobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters,
jobParameter, failover, misfire, description, jobProperties);
JobTypeConfiguration typeConfig = getJobTypeConfiguration(coreConfig, jobType, jobClass, streamingProcess, scriptCommandLine);
return getJobRootConfiguration(typeConfig, customizedValueMap);
}
private JobProperties getJobProperties(final JsonReader in) throws IOException {
JobProperties result = new JobProperties();
in.beginObject();
while (in.hasNext()) {
switch (in.nextName()) {
case "job_exception_handler":
result.put(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), in.nextString());
break;
case "executor_service_handler":
result.put(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), in.nextString());
break;
default:
break;
}
}
in.endObject();
return result;
}
protected abstract void addToCustomizedValueMap(String jsonName, JsonReader in, Map<String, Object> customizedValueMap) throws IOException;
private JobCoreConfiguration getJobCoreConfiguration(final String jobName, final String cron, final int shardingTotalCount,
final String shardingItemParameters, final String jobParameter, final boolean failover,
final boolean misfire, final String description,
final JobProperties jobProperties) {
return JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).jobParameter(jobParameter).failover(failover).misfire(misfire).description(description)
.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobProperties.get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER))
.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), jobProperties.get(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER))
.build();
}
private JobTypeConfiguration getJobTypeConfiguration(
final JobCoreConfiguration coreConfig, final JobType jobType, final String jobClass, final boolean streamingProcess, final String scriptCommandLine) {
Preconditions.checkNotNull(jobType, "jobType cannot be null.");
switch (jobType) {
case SIMPLE:
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobClass), "jobClass cannot be empty.");
return new SimpleJobConfiguration(coreConfig, jobClass);
case DATAFLOW:
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobClass), "jobClass cannot be empty.");
return new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
case SCRIPT:
return new ScriptJobConfiguration(coreConfig, scriptCommandLine);
default:
throw new UnsupportedOperationException(String.valueOf(jobType));
}
}
protected abstract T getJobRootConfiguration(JobTypeConfiguration typeConfig, Map<String, Object> customizedValueMap);
@Override
public void write(final JsonWriter out, final T value) throws IOException {
out.beginObject();
out.name("jobName").value(value.getTypeConfig().getCoreConfig().getJobName());
out.name("jobClass").value(value.getTypeConfig().getJobClass());
out.name("jobType").value(value.getTypeConfig().getJobType().name());
out.name("cron").value(value.getTypeConfig().getCoreConfig().getCron());
out.name("shardingTotalCount").value(value.getTypeConfig().getCoreConfig().getShardingTotalCount());
out.name("shardingItemParameters").value(value.getTypeConfig().getCoreConfig().getShardingItemParameters());
out.name("jobParameter").value(value.getTypeConfig().getCoreConfig().getJobParameter());
out.name("failover").value(value.getTypeConfig().getCoreConfig().isFailover());
out.name("misfire").value(value.getTypeConfig().getCoreConfig().isMisfire());
out.name("description").value(value.getTypeConfig().getCoreConfig().getDescription());
out.name("jobProperties").jsonValue(value.getTypeConfig().getCoreConfig().getJobProperties().json());
if (value.getTypeConfig().getJobType() == JobType.DATAFLOW) {
DataflowJobConfiguration dataflowJobConfig = (DataflowJobConfiguration) value.getTypeConfig();
out.name("streamingProcess").value(dataflowJobConfig.isStreamingProcess());
} else if (value.getTypeConfig().getJobType() == JobType.SCRIPT) {
ScriptJobConfiguration scriptJobConfig = (ScriptJobConfiguration) value.getTypeConfig();
out.name("scriptCommandLine").value(scriptJobConfig.getScriptCommandLine());
}
writeCustomized(out, value);
out.endObject();
}
protected abstract void writeCustomized(JsonWriter out, T value) throws IOException;
}