blob: 2665c7f753de096f39b3fdf30f80cd2bef4a4dc0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.container.services.persistence;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.PersistenceException;
import org.apache.batchee.container.impl.JobExecutionImpl;
import org.apache.batchee.container.impl.JobInstanceImpl;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.StepExecutionImpl;
import org.apache.batchee.container.impl.controller.PartitionedStepBuilder;
import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.services.persistence.jpa.EntityManagerProvider;
import org.apache.batchee.container.services.persistence.jpa.TransactionProvider;
import org.apache.batchee.container.services.persistence.jpa.domain.CheckpointEntity;
import org.apache.batchee.container.services.persistence.jpa.domain.JobExecutionEntity;
import org.apache.batchee.container.services.persistence.jpa.domain.JobInstanceEntity;
import org.apache.batchee.container.services.persistence.jpa.domain.StepExecutionEntity;
import org.apache.batchee.container.services.persistence.jpa.provider.DefaultEntityManagerProvider;
import org.apache.batchee.container.services.persistence.jpa.provider.DefaultTransactionProvider;
import org.apache.batchee.container.services.persistence.jpa.provider.EEEntityManagerProvider;
import org.apache.batchee.container.services.persistence.jpa.provider.EETransactionProvider;
import org.apache.batchee.container.status.JobStatus;
import org.apache.batchee.container.status.StepStatus;
import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.operations.NoSuchJobExecutionException;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import javax.persistence.EntityManager;
import javax.persistence.NoResultException;
import javax.persistence.TemporalType;
import javax.persistence.TypedQuery;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.apache.batchee.container.util.Serializations.deserialize;
import static org.apache.batchee.container.util.Serializations.serialize;
public class JPAPersistenceManagerService implements PersistenceManagerService {
private final static Logger LOGGER = Logger.getLogger(JPAPersistenceManagerService.class.getName());
private static final String[] DELETE_ID_QUERIES = {
StepExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, CheckpointEntity.Queries.DELETE_BY_INSTANCE_ID,
JobExecutionEntity.Queries.DELETE_BY_INSTANCE_ID, JobInstanceEntity.Queries.DELETE_BY_INSTANCE_ID
};
private static final String[] DELETE_DATE_QUERIES = {
StepExecutionEntity.Queries.DELETE_BY_DATE, CheckpointEntity.Queries.DELETE_BY_DATE,
JobInstanceEntity.Queries.DELETE_BY_DATE, JobExecutionEntity.Queries.DELETE_BY_DATE
};
private EntityManagerProvider emProvider;
private TransactionProvider txProvider;
@Override
public void cleanUp(final long instanceId) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
for (final String query : DELETE_ID_QUERIES) {
em.createNamedQuery(query).setParameter("instanceId", instanceId).executeUpdate();
}
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public void cleanUp(final Date until) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
for (final String query : DELETE_DATE_QUERIES) {
em.createNamedQuery(query).setParameter("date", until, TemporalType.TIMESTAMP).executeUpdate();
}
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public StepStatus getStepStatus(final long instanceId, final String stepName) {
final EntityManager em = emProvider.newEntityManager();
try {
final List<StepExecutionEntity> list = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_INSTANCE_AND_NAME, StepExecutionEntity.class)
.setParameter("instanceId", instanceId)
.setParameter("step", stepName)
.getResultList();
if (list != null && !list.isEmpty()) {
final StepExecutionEntity entity = list.iterator().next();
final StepStatus status = new StepStatus(entity.getId(), entity.getStartCount());
status.setBatchStatus(entity.getBatchStatus());
status.setExitStatus(entity.getExitStatus());
status.setNumPartitions(entity.getNumPartitions());
status.setLastRunStepExecutionId(entity.getLastRunStepExecutionId());
if (entity.getPersistentData() != null) {
status.setPersistentUserData(new PersistentDataWrapper(entity.getPersistentData()));
}
return status;
}
return null;
} finally {
emProvider.release(em);
}
}
@Override
public Set<String> getJobNames() {
Set<String> jobNames = new TreeSet<String>();
final EntityManager em = emProvider.newEntityManager();
try {
final List<String> list = em.createNamedQuery(JobInstanceEntity.Queries.FIND_JOBNAMES, String.class)
.setParameter("pattern", PartitionedStepBuilder.JOB_ID_SEPARATOR + "%")
.getResultList();
jobNames.addAll(list);
} finally {
emProvider.release(em);
}
return jobNames;
}
@Override
public JobStatus getJobStatusFromExecution(final long executionId) {
final EntityManager em = emProvider.newEntityManager();
try {
final JobInstanceEntity entity = em.createNamedQuery(JobInstanceEntity.Queries.FIND_FROM_EXECUTION, JobInstanceEntity.class)
.setParameter("executionId", executionId)
.getSingleResult();
final JobStatus status = new JobStatus(entity.getJobInstanceId());
setJobStatusData(status, entity);
return status;
} catch (NoResultException e) {
throw new NoSuchJobExecutionException("Execution #" + executionId, e);
} finally {
emProvider.release(em);
}
}
@Override
public StepExecution getStepExecutionByStepExecutionId(final long stepExecId) {
final EntityManager em = emProvider.newEntityManager();
try {
final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecId);
final StepExecutionImpl stepEx = new StepExecutionImpl(entity.getExecution().getExecutionId(), stepExecId);
setStepExecutionData(entity, stepEx);
return stepEx;
} finally {
emProvider.release(em);
}
}
private void setStepExecutionData(final StepExecutionEntity entity, final StepExecutionImpl stepEx) {
stepEx.setBatchStatus(entity.getBatchStatus());
stepEx.setExitStatus(entity.getExitStatus());
stepEx.setStepName(entity.getStepName());
stepEx.setReadCount(entity.getRead());
stepEx.setWriteCount(entity.getWrite());
stepEx.setCommitCount(entity.getCommit());
stepEx.setRollbackCount(entity.getRollback());
stepEx.setReadSkipCount(entity.getReadSkip());
stepEx.setProcessSkipCount(entity.getProcessSkip());
stepEx.setFilterCount(entity.getFilter());
stepEx.setWriteSkipCount(entity.getWriteSkip());
stepEx.setStartTime(entity.getStartTime());
stepEx.setEndTime(entity.getEndTime());
try {
stepEx.setPersistentUserData(deserialize(entity.getPersistentData()));
} catch (final Exception e) {
throw new PersistenceException(e);
}
}
@Override
public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
final EntityManager em = emProvider.newEntityManager();
try {
final List<StepExecutionEntity> steps = em.createNamedQuery(StepExecutionEntity.Queries.FIND_BY_EXECUTION, StepExecutionEntity.class)
.setParameter("executionId", execid)
.getResultList();
if (steps == null) {
return Collections.emptyList();
}
final List<StepExecution> executions = new ArrayList<StepExecution>(steps.size());
for (final StepExecutionEntity entity : steps) {
final StepExecutionImpl stepEx = new StepExecutionImpl(execid, entity.getId());
setStepExecutionData(entity, stepEx);
executions.add(stepEx);
}
return executions;
} finally {
emProvider.release(em);
}
}
@Override
public void updateStepExecution(final long jobExecId, final StepContextImpl stepContext) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepContext.getStepInternalExecID());
setStepData(em, jobExecId, stepContext,
stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
entity);
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public StepExecutionImpl createStepExecution(final long jobExecId, final StepContextImpl stepContext) {
final StepExecutionEntity entity = new StepExecutionEntity();
final EntityManager em = emProvider.newEntityManager();
try {
setStepData(em, jobExecId, stepContext,
stepContext.metricsAsMap().get(Metric.MetricType.READ_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.WRITE_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.COMMIT_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.ROLLBACK_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.READ_SKIP_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.PROCESS_SKIP_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.FILTER_COUNT.name()).getValue(),
stepContext.metricsAsMap().get(Metric.MetricType.WRITE_SKIP_COUNT.name()).getValue(),
entity);
final Object tx = txProvider.start(em);
try {
em.persist(entity);
txProvider.commit(tx);
final StepExecutionImpl stepExecution = new StepExecutionImpl(jobExecId, entity.getId());
stepExecution.setStepName(stepContext.getStepName());
return stepExecution;
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
//CHECKSTYLE:OFF
private void setStepData(final EntityManager em,
final long jobExecId, final StepContextImpl stepContext,
final long readCount, final long writeCount,
final long commitCount, final long rollbackCount,
final long readSkipCount, final long processSkipCount,
final long filterCount, final long writeSkipCount,
final StepExecutionEntity entity) {
//CHECKSTYLE:ON
entity.setExecution(em.find(JobExecutionEntity.class, jobExecId));
entity.setBatchStatus(stepContext.getBatchStatus());
entity.setExitStatus(stepContext.getExitStatus());
entity.setStepName(stepContext.getStepName());
entity.setRead(readCount);
entity.setWrite(writeCount);
entity.setCommit(commitCount);
entity.setRollback(rollbackCount);
entity.setReadSkip(readSkipCount);
entity.setProcessSkip(processSkipCount);
entity.setFilter(filterCount);
entity.setWriteSkip(writeSkipCount);
entity.setStartTime(stepContext.getStartTimeTS());
entity.setEndTime(stepContext.getEndTimeTS());
try {
entity.setPersistentData(serialize(stepContext.getPersistentUserData()));
} catch (final IOException e) {
throw new PersistenceException(e);
}
}
@Override
public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
final EntityManager em = emProvider.newEntityManager();
try {
final JobExecutionEntity jobExecutionEntity = em.find(JobExecutionEntity.class, executionId);
if (jobExecutionEntity == null) {
throw new NoSuchJobExecutionException("Execution #" + executionId);
}
return jobExecutionEntity.getInstance().getJobInstanceId();
} finally {
emProvider.release(em);
}
}
@Override
public Set<Long> jobOperatorGetRunningExecutions(final String jobName) {
final EntityManager em = emProvider.newEntityManager();
try {
final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_RUNNING, JobExecutionEntity.class)
.setParameter("name", jobName)
.setParameter("statuses", JobExecutionEntity.Queries.RUNNING_STATUSES)
.getResultList();
if (list == null) {
return Collections.emptySet();
}
final Set<Long> ids= new HashSet<Long>(list.size());
for (final JobExecutionEntity entity : list) {
ids.add(entity.getExecutionId());
}
return ids;
} finally {
emProvider.release(em);
}
}
@Override
public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
final EntityManager em = emProvider.newEntityManager();
try {
final List<JobExecutionEntity> list = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
.setParameter("instanceId", jobInstanceId).getResultList();
if (list == null) {
return Collections.emptyList();
}
final List<InternalJobExecution> result = new ArrayList<InternalJobExecution>(list.size());
for (final JobExecutionEntity entity : list) {
final JobExecutionImpl jobEx = new JobExecutionImpl(entity.getExecutionId(), jobInstanceId, this);
jobEx.setCreateTime(entity.getCreateTime());
jobEx.setStartTime(entity.getStartTime());
jobEx.setEndTime(entity.getEndTime());
jobEx.setLastUpdateTime(entity.getUpdateTime());
jobEx.setBatchStatus(entity.getBatchStatus().name());
jobEx.setExitStatus(entity.getExitStatus());
jobEx.setJobName(entity.getInstance().getName());
jobEx.setJobParameters(entity.getJobProperties());
result.add(jobEx);
}
return result;
} finally {
emProvider.release(em);
}
}
@Override
public void updateWithFinalExecutionStatusesAndTimestamps(final long key, final BatchStatus batchStatus, final String exitStatus, final Timestamp updatets) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
instance.setBatchStatus(batchStatus);
instance.setUpdateTime(updatets);
instance.setEndTime(updatets);
instance.setExitStatus(exitStatus);
em.merge(instance);
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public InternalJobExecution jobOperatorGetJobExecution(final long jobExecutionId) {
final EntityManager em = emProvider.newEntityManager();
try {
final JobExecutionEntity instance = em.find(JobExecutionEntity.class, jobExecutionId);
if(instance == null) {
throw new NoSuchJobExecutionException("Execution #" + jobExecutionId);
}
final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instance.getInstance().getJobInstanceId(), this);
jobEx.setCreateTime(instance.getCreateTime());
jobEx.setStartTime(instance.getStartTime());
jobEx.setEndTime(instance.getEndTime());
jobEx.setJobParameters(instance.getJobProperties());
jobEx.setLastUpdateTime(instance.getUpdateTime());
if (instance.getBatchStatus() != null) {
jobEx.setBatchStatus(instance.getBatchStatus().name());
}
jobEx.setExitStatus(instance.getExitStatus());
jobEx.setJobName(instance.getInstance().getName());
return jobEx;
} finally {
emProvider.release(em);
}
}
@Override
public void markJobStarted(final long key, final Timestamp startTS) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
final JobExecutionEntity instance = em.find(JobExecutionEntity.class, key);
instance.setBatchStatus(BatchStatus.STARTED);
instance.setStartTime(startTS);
instance.setUpdateTime(startTS);
em.merge(instance);
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public Properties getParameters(final long executionId) throws NoSuchJobExecutionException {
final EntityManager em = emProvider.newEntityManager();
try {
return em.find(JobExecutionEntity.class, executionId).getJobProperties();
} finally {
emProvider.release(em);
}
}
@Override
public String jobOperatorQueryJobExecutionExitStatus(final long key) {
final EntityManager em = emProvider.newEntityManager();
try {
return em.find(JobExecutionEntity.class, key).getExitStatus();
} finally {
emProvider.release(em);
}
}
@Override
public String jobOperatorQueryJobExecutionBatchStatus(final long key) {
final EntityManager em = emProvider.newEntityManager();
try {
return em.find(JobExecutionEntity.class, key).getBatchStatus().name();
} finally {
emProvider.release(em);
}
}
@Override
public Timestamp jobOperatorQueryJobExecutionTimestamp(final long key, final TimestampType timetype) {
final EntityManager em = emProvider.newEntityManager();
try {
final JobExecutionEntity entity = em.find(JobExecutionEntity.class, key);
if (timetype.equals(TimestampType.CREATE)) {
return entity.getCreateTime();
} else if (timetype.equals(TimestampType.END)) {
return entity.getEndTime();
} else if (timetype.equals(TimestampType.LAST_UPDATED)) {
return entity.getUpdateTime();
} else if (timetype.equals(TimestampType.STARTED)) {
return entity.getStartTime();
} else {
throw new IllegalArgumentException("Unexpected enum value.");
}
} finally {
emProvider.release(em);
}
}
@Override
public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
final EntityManager em = emProvider.newEntityManager();
try {
final JobExecutionEntity instance = new JobExecutionEntity();
instance.setCreateTime(new Timestamp(System.currentTimeMillis()));
instance.setUpdateTime(instance.getCreateTime());
instance.setBatchStatus(batchStatus);
final Object tx = txProvider.start(em);
try {
instance.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
em.persist(instance);
txProvider.commit(tx);
final RuntimeFlowInSplitExecution jobExecution = new RuntimeFlowInSplitExecution(jobInstance, instance.getExecutionId(), this);
jobExecution.setBatchStatus(batchStatus.name());
jobExecution.setCreateTime(instance.getCreateTime());
jobExecution.setLastUpdateTime(instance.getCreateTime());
return jobExecution;
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public long getMostRecentExecutionId(final long jobInstanceId) {
final EntityManager em = emProvider.newEntityManager();
try {
return em.createNamedQuery(JobExecutionEntity.Queries.MOST_RECENT, JobExecutionEntity.class)
.setParameter("instanceId", jobInstanceId)
.setMaxResults(1)
.getSingleResult().getExecutionId();
} finally {
emProvider.release(em);
}
}
@Override
public void updateBatchStatusOnly(final long executionId, final BatchStatus batchStatus, final Timestamp timestamp) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
final JobExecutionEntity instance = em.find(JobExecutionEntity.class, executionId);
instance.setBatchStatus(batchStatus);
instance.setUpdateTime(timestamp);
try {
em.merge(instance);
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
final EntityManager em = emProvider.newEntityManager();
try {
final JobExecutionEntity execution = new JobExecutionEntity();
execution.setJobProperties(jobParameters);
execution.setCreateTime(new Timestamp(System.currentTimeMillis()));
execution.setUpdateTime(execution.getCreateTime());
execution.setBatchStatus(batchStatus);
final Object tx = txProvider.start(em);
try {
execution.setInstance(em.find(JobInstanceEntity.class, jobInstance.getInstanceId()));
em.persist(execution);
txProvider.commit(tx);
final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, execution.getExecutionId(), this);
jobExecution.setBatchStatus(batchStatus.name());
jobExecution.setCreateTime(execution.getCreateTime());
jobExecution.setLastUpdateTime(execution.getCreateTime());
return jobExecution;
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public JobInstance createSubJobInstance(final String name) {
return createJobInstance(name, null);
}
@Override
public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final int start, final int count) {
final EntityManager em = emProvider.newEntityManager();
try {
final TypedQuery<JobInstanceEntity> query;
query = em.createNamedQuery(JobInstanceEntity.Queries.FIND_BY_NAME, JobInstanceEntity.class);
query.setParameter("name", jobName);
final List<JobInstanceEntity> resultList = query
.setFirstResult(start)
.setMaxResults(count)
.getResultList();
if (resultList == null) {
return Collections.emptyList();
}
final List<Long> result = new ArrayList<Long>(resultList.size());
for (final JobInstanceEntity entity : resultList) {
result.add(entity.getJobInstanceId());
}
return result;
} finally {
emProvider.release(em);
}
}
@Override
public JobInstance createJobInstance(final String name, final String jobXml) {
final EntityManager em = emProvider.newEntityManager();
try {
final JobInstanceEntity instance = new JobInstanceEntity();
instance.setName(name);
instance.setJobXml(jobXml);
final Object tx = txProvider.start(em);
try {
em.persist(instance);
txProvider.commit(tx);
final JobInstanceImpl jobInstance = new JobInstanceImpl(instance.getJobInstanceId(), jobXml);
jobInstance.setJobName(name);
return jobInstance;
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public int jobOperatorGetJobInstanceCount(final String jobName) {
final EntityManager em = emProvider.newEntityManager();
try {
return em.createNamedQuery(JobInstanceEntity.Queries.COUNT_BY_NAME, Number.class)
.setParameter("name", jobName)
.getSingleResult().intValue();
} finally {
emProvider.release(em);
}
}
@Override
public void updateStepStatus(final long stepExecutionId, final StepStatus stepStatus) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
final StepExecutionEntity entity = em.find(StepExecutionEntity.class, stepExecutionId);
entity.setBatchStatus(stepStatus.getBatchStatus());
entity.setExitStatus(stepStatus.getExitStatus());
entity.setLastRunStepExecutionId(stepStatus.getLastRunStepExecutionId());
entity.setNumPartitions(stepStatus.getNumPartitions());
entity.setPersistentData(stepStatus.getRawPersistentUserData());
entity.setStartCount(stepStatus.getStartCount());
em.merge(entity);
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public StepStatus createStepStatus(final long stepExecId) {
return new StepStatus(stepExecId); // instance already created
}
@Override
public void updateJobStatus(final long instanceId, final JobStatus jobStatus) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
final JobInstanceEntity entity = em.find(JobInstanceEntity.class, instanceId);
entity.setBatchStatus(jobStatus.getBatchStatus());
entity.setStep(jobStatus.getCurrentStepId());
entity.setLatestExecution(jobStatus.getLatestExecutionId());
entity.setExitStatus(jobStatus.getExitStatus());
entity.setRestartOn(jobStatus.getRestartOn());
if (jobStatus.getJobInstance() != null) {
entity.setName(jobStatus.getJobInstance().getJobName());
}
em.merge(entity);
final List<JobExecutionEntity> executions = em.createNamedQuery(JobExecutionEntity.Queries.FIND_BY_INSTANCE, JobExecutionEntity.class)
.setParameter("instanceId", instanceId)
.getResultList();
if (executions != null) {
for (final JobExecutionEntity e : executions) {
e.setInstance(entity);
em.merge(e);
}
}
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public JobStatus getJobStatus(final long instanceId) {
final JobStatus status = new JobStatus(instanceId);
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
setJobStatusData(status, em.find(JobInstanceEntity.class, instanceId));
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
return status;
}
private void setJobStatusData(final JobStatus status, final JobInstanceEntity entity) {
status.setBatchStatus(entity.getBatchStatus());
status.setCurrentStepId(entity.getStep());
status.setLatestExecutionId(entity.getLatestExecution());
status.setExitStatus(entity.getExitStatus());
status.setRestartOn(entity.getRestartOn());
status.setJobInstance(entity.toJobInstance());
}
@Override
public JobStatus createJobStatus(final long jobInstanceId) {
return new JobStatus(jobInstanceId); // instance already created
}
@Override
public void setCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
final EntityManager em = emProvider.newEntityManager();
try {
final Object tx = txProvider.start(em);
try {
final List<CheckpointEntity> checkpoints = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
.setParameter("jobInstanceId", key.getJobInstanceId())
.setParameter("stepName", key.getStepName())
.setParameter("type", key.getType())
.getResultList();
final CheckpointEntity checkpoint;
final boolean isNew = checkpoints == null || checkpoints.isEmpty();
if (isNew) {
checkpoint = new CheckpointEntity();
checkpoint.setInstance(em.find(JobInstanceEntity.class, key.getJobInstanceId()));
checkpoint.setStepName(key.getStepName());
checkpoint.setType(key.getType());
} else {
checkpoint = checkpoints.iterator().next();
}
checkpoint.setData(value.getRestartToken());
if (isNew) {
em.persist(checkpoint);
} else {
em.merge(checkpoint);
}
txProvider.commit(tx);
} catch (final Exception e) {
throw new BatchContainerRuntimeException(performRollback(tx, e));
}
} finally {
emProvider.release(em);
}
}
@Override
public CheckpointData getCheckpointData(final CheckpointDataKey key) {
final EntityManager em = emProvider.newEntityManager();
try {
final CheckpointEntity checkpoint = em.createNamedQuery(CheckpointEntity.Queries.FIND, CheckpointEntity.class)
.setParameter("jobInstanceId", key.getJobInstanceId())
.setParameter("stepName", key.getStepName())
.setParameter("type", key.getType())
.getSingleResult();
final CheckpointData data = new CheckpointData(checkpoint.getInstance().getJobInstanceId(), checkpoint.getStepName(), checkpoint.getType());
data.setRestartToken(checkpoint.getData());
return data;
} catch (final NoResultException nre) {
return null;
} finally {
emProvider.release(em);
}
}
@Override
public void init(final Properties batchConfig) {
final boolean ee = "true".equalsIgnoreCase(batchConfig.getProperty("persistence.jpa.ee", "false"));
final String txProviderClass = batchConfig.getProperty(
"persistence.jpa.transaction-provider", ee ? EETransactionProvider.class.getName() : DefaultTransactionProvider.class.getName());
try {
txProvider = TransactionProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(txProviderClass).newInstance());
} catch (final Exception e) {
throw new BatchContainerRuntimeException(e);
}
txProvider.init(batchConfig);
final String providerClass = batchConfig.getProperty(
"persistence.jpa.entity-manager-provider", ee ? EEEntityManagerProvider.class.getName() : DefaultEntityManagerProvider.class.getName());
try {
emProvider = EntityManagerProvider.class.cast(Thread.currentThread().getContextClassLoader().loadClass(providerClass).newInstance());
} catch (final Exception e) {
throw new BatchContainerRuntimeException(e);
}
emProvider.init(batchConfig);
}
/**
* Rollback the transaction and eventually log any exception during rollback.
* This method ensures that an Exception during rollback will *not* hide
* the original Exception!
*
* @return the original Exception
*/
private Exception performRollback(Object tx, Exception originalException) {
try {
txProvider.rollback(tx, originalException);
} catch (Exception exceptionDuringRollback) {
LOGGER.log(Level.SEVERE, "Got an Exception while rolling back due to another Exception. Printing Rollback Exception and re-throwing original one"
, exceptionDuringRollback);
}
return originalException;
}
}