blob: 079565aa9fc9f39a49efcc113d62040d7afdaa77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.spi;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.StepExecutionImpl;
import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
import org.apache.batchee.container.impl.controller.chunk.CheckpointType;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.services.persistence.JDBCPersistenceManagerService;
import org.apache.batchee.container.services.persistence.JPAPersistenceManagerService;
import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
import org.apache.batchee.container.status.JobStatus;
import org.junit.Test;
import javax.batch.operations.NoSuchJobExecutionException;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class PersistenceManagerServiceTest {
@Test
public void cleanUpUntil() {
for (final PersistenceManagerService service : asList(
new JDBCPersistenceManagerService() {{
init(new Properties());
}},
new JPAPersistenceManagerService() {{
init(new Properties());
}},
new MemoryPersistenceManagerService() {{
init(new Properties());
}})) {
System.out.println("");
System.out.println(" " + service);
System.out.println("");
for (int i = 0; i < 3; i++) {
final JobInstance instance = service.createJobInstance("test", "xml");
final RuntimeJobExecution exec = service.createJobExecution(instance, new Properties(), BatchStatus.COMPLETED);
final StepExecutionImpl step = service.createStepExecution(exec.getExecutionId(), new StepContextImpl("step"));
service.createStepStatus(step.getStepExecutionId()).setBatchStatus(BatchStatus.STARTED);
service.setCheckpointData(
new CheckpointDataKey(instance.getInstanceId(), "step", CheckpointType.READER),
new CheckpointData(instance.getInstanceId(), "step", CheckpointType.READER) {{
setRestartToken("restart".getBytes());
}});
service.createJobStatus(instance.getInstanceId());
service.updateJobStatus(instance.getInstanceId(), new JobStatus(instance) {{
setBatchStatus(BatchStatus.COMPLETED);
}});
service.updateWithFinalExecutionStatusesAndTimestamps(exec.getExecutionId(), BatchStatus.COMPLETED, "ok", new Timestamp(System.currentTimeMillis()));
// sanity checks we persisted data before deleting them
assertNotNull(service.getJobStatus(instance.getInstanceId()));
assertNotNull(service.getJobInstanceIdByExecutionId(exec.getExecutionId()));
assertNotNull(service.jobOperatorGetJobExecution(exec.getExecutionId()));
assertNotNull(service.getStepExecutionByStepExecutionId(step.getStepExecutionId()));
assertNotNull(service.getCheckpointData(new CheckpointDataKey(instance.getInstanceId(), "step", CheckpointType.READER)));
if (i != 2) { // skip last since we are not there to have a break
try { // add some delay
Thread.sleep(1000);
} catch (final InterruptedException e) {
Thread.interrupted();
fail();
}
}
}
// now delete until the first one (+ delta)
final List<Long> instances = service.jobOperatorGetJobInstanceIds("test", 0, 10);
assertEquals(3, instances.size());
Collections.sort(instances); // we use increments everywhere so should be fine
final InternalJobExecution firstExec = service.jobOperatorGetJobExecution(service.getMostRecentExecutionId(instances.iterator().next()));
final Date until = new Date(firstExec.getEndTime().getTime() + 100);
service.cleanUp(until);
assertEquals(2, service.jobOperatorGetJobInstanceIds("test", 0, 10).size());
assertTrue(service.getStepExecutionsForJobExecution(firstExec.getExecutionId()).isEmpty());
assertNull(service.getCheckpointData(new CheckpointDataKey(firstExec.getInstanceId(), "step", CheckpointType.READER)));
try {
service.getJobInstanceIdByExecutionId(firstExec.getExecutionId());
fail();
} catch (final NoSuchJobExecutionException nsje) {
// ok
}
assertFalse(service.jobOperatorGetJobInstanceIds("test", 0, 10).contains(firstExec.getInstanceId()));
}
}
}