| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sling.jobs.impl; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import org.apache.sling.jobs.Job; |
| import org.apache.sling.jobs.JobUpdate; |
| import org.apache.sling.jobs.Types; |
| import org.apache.sling.jobs.impl.spi.MapValueAdapter; |
| |
| /** |
| * Represents messages sent to the Job via a message queue. |
| * Abort, stop and update messages should be sent via a priority queue. |
| * Start messages should be sent by a processing queue. |
| */ |
| public class JobUpdateImpl implements MapValueAdapter, JobUpdate { |
| private static final long TTL = 1000 * 60; |
| private long updateTimestamp; |
| private long expires; |
| private JobUpdateCommand command; |
| private Types.JobQueue jobQueue; |
| private String id; |
| private Job.JobState jobState; |
| private Map<String, Object> properties; |
| private int retryCount; |
| private int numberOfRetries; |
| private long startedAt; |
| private long createdAt; |
| private long finishedAt; |
| private String resultMessage; |
| private Types.JobType jobType; |
| |
| |
| /** |
| * Create an update message using a job, command and update properties. Only the update properties will in the update. |
| * The job will be used to specify the job jobQueue, job ID and job state of the update message. |
| * @param job the job |
| * @param command the command |
| * @param properties properties in the update message. |
| */ |
| public JobUpdateImpl(@Nonnull Job job, @Nonnull JobUpdateCommand command, @Nonnull Map<String, Object> properties) { |
| if ( job == null ) { |
| throw new IllegalArgumentException("Job argument cant be null"); |
| } |
| if ( command == null ) { |
| throw new IllegalArgumentException("JobCommand argument cant be null"); |
| } |
| if ( properties == null ) { |
| throw new IllegalArgumentException("Map of properties cant be null"); |
| } |
| |
| jobQueue = job.getQueue(); |
| jobType = job.getJobType(); |
| id = job.getId(); |
| startedAt = job.getStarted(); |
| createdAt = job.getCreated(); |
| finishedAt = job.getFinished(); |
| retryCount = job.getRetryCount(); |
| jobState = job.getJobState(); |
| resultMessage = job.getResultMessage(); |
| numberOfRetries = job.getNumberOfRetries(); |
| updateTimestamp = System.currentTimeMillis(); |
| expires = updateTimestamp + TTL; |
| this.command = command; |
| this.properties = properties; |
| } |
| |
| /** |
| * Create a JobUpdateImpl based on a inbound message in the form of a Map. |
| * @param message a inbound message in map form. |
| */ |
| public JobUpdateImpl(@Nonnull Map<String, Object> message) { |
| if ( message == null ) { |
| throw new IllegalArgumentException("Message cant be null"); |
| } |
| fromMapValue(message); |
| } |
| |
| public JobUpdateImpl(@Nonnull String jobId, @Nonnull JobUpdateCommand command) { |
| if ( jobId == null ) { |
| throw new IllegalArgumentException("JobId argument cant be null"); |
| } |
| if ( command == null ) { |
| throw new IllegalArgumentException("JobUpdateCommand argument cant be null"); |
| } |
| jobQueue = Types.ANY_JOB_QUEUE; |
| id = jobId; |
| updateTimestamp = System.currentTimeMillis(); |
| expires = updateTimestamp + TTL; |
| jobState = Job.JobState.ANY_STATE; |
| this.command = command; |
| this.properties = Collections.emptyMap(); |
| |
| } |
| |
| |
| @Override |
| public long updateTimestamp() { |
| return updateTimestamp; |
| } |
| |
| @Override |
| public long expires() { |
| return expires; |
| } |
| |
| @Nonnull |
| @Override |
| public Types.JobType getJobType() { |
| return jobType; |
| } |
| |
| @Nonnull |
| @Override |
| public JobUpdateCommand getCommand() { |
| return command; |
| } |
| |
| @Nonnull |
| @Override |
| public Types.JobQueue getQueue() { |
| return jobQueue; |
| } |
| |
| @Nonnull |
| @Override |
| public String getId() { |
| return id; |
| } |
| |
| @Nonnull |
| @Override |
| public Job.JobState getState() { |
| return jobState; |
| } |
| |
| @Nonnull |
| @Override |
| public Map<String, Object> getProperties() { |
| return properties; |
| } |
| |
| @Override |
| public int getRetryCount() { |
| return retryCount; |
| } |
| |
| @Override |
| public int getNumberOfRetries() { |
| return numberOfRetries; |
| } |
| |
| @Override |
| public long getStarted() { |
| return startedAt; |
| } |
| |
| @Override |
| public long getCreated() { |
| return createdAt; |
| } |
| |
| @Override |
| public long getFinished() { |
| return finishedAt; |
| } |
| |
| @Override |
| public String getResultMessage() { |
| return resultMessage; |
| } |
| |
| |
| @Override |
| public void fromMapValue(@Nullable Object mapValue) { |
| if (mapValue != null && mapValue instanceof Map) { |
| @SuppressWarnings("unchecked") Map<String, Object> m = (Map<String, Object>) mapValue; |
| jobQueue = Types.jobQueue((String) Utils.getRequired(m, "tp")); |
| jobType = Types.jobType((String)Utils.getRequired(m, "jt")); |
| id = Utils.getRequired(m, "id"); |
| command = JobUpdateCommand.valueOf((String) Utils.getRequired(m, "cm")); |
| updateTimestamp = Utils.getRequired(m, "ts"); |
| expires = Utils.getRequired(m, "ex"); |
| if (command == JobUpdateCommand.UPDATE_JOB || command == JobUpdateCommand.START_JOB || command == JobUpdateCommand.RETRY_JOB ) { |
| startedAt = Utils.getOptional(m, "startedAt", 0L); |
| createdAt = Utils.getOptional(m, "createdAt", 0L); |
| finishedAt = Utils.getOptional(m, "finishedAt", 0L); |
| retryCount = Utils.getOptional(m, "retryCount", 0); |
| numberOfRetries = Utils.getOptional(m, "nRetries", 10); |
| jobState = Job.JobState.valueOf(Utils.getOptional(m, "jobState", Job.JobState.QUEUED.toString())); |
| resultMessage = Utils.getOptional(m, "resultMessage", null); |
| properties = Utils.getOptional(m, "properties", new HashMap<String, Object>()); |
| } else { |
| properties = new HashMap<String, Object>(); |
| } |
| } else { |
| throw new IllegalArgumentException("Cant populate JobImpl from "+mapValue); |
| } |
| } |
| |
| @Override |
| @Nonnull |
| public Object toMapValue() { |
| final Map<String, Object> builder = new HashMap<>(); |
| builder.put("tp", jobQueue.toString()); |
| builder.put("jt",jobType.toString()); |
| builder.put("id",id); |
| builder.put("cm", command.toString()); |
| builder.put("ts", this.updateTimestamp); |
| builder.put("ex", expires); |
| if ( command == JobUpdateCommand.UPDATE_JOB || command == JobUpdateCommand.START_JOB || command == JobUpdateCommand.RETRY_JOB ) { |
| builder.put("retryCount", retryCount); |
| builder.put("nRetries", numberOfRetries); |
| builder.put("startedAt", startedAt); |
| builder.put("createdAt", createdAt); |
| builder.put("finishedAt", finishedAt); |
| builder.put("jobState", jobState.toString()); |
| builder.put("resultMessage", resultMessage); |
| builder.put("properties", Collections.unmodifiableMap(properties)); |
| |
| } |
| return Collections.unmodifiableMap(builder); |
| } |
| } |