blob: d2e7b04da334a4519a757cb842eadcef49345931 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job.archive;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import org.apache.ambari.infra.job.JobContextRepository;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContext;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.repeat.RepeatStatus;
@RunWith(EasyMockRunner.class)
public class DocumentExporterTest extends EasyMockSupport {
private static final long JOB_EXECUTION_ID = 1L;
private static final long STEP_EXECUTION_ID = 1L;
private static final Document DOCUMENT_2 = new Document(new HashMap<String, Object>() {{
put("id", "2");
}});
private static final Document DOCUMENT_3 = new Document(new HashMap<String, Object>() {{
put("id", "3");
}});
private DocumentExporter documentExporter;
@Mock
private ItemStreamReader<Document> reader;
@Mock
private DocumentDestination documentDestination;
@Mock
private DocumentItemWriter documentItemWriter;
@Mock
private DocumentItemWriter documentItemWriter2;
@Mock
private DocumentItemWriter documentItemWriter3;
@Mock
private JobContextRepository jobContextRepository;
private ChunkContext chunkContext;
private static final Document DOCUMENT = new Document(new HashMap<String, Object>() {{ put("id", "1"); }});
@Before
public void setUp() throws Exception {
chunkContext = chunkContext(BatchStatus.STARTED);
documentExporter = documentExporter(2);
}
private DocumentExporter documentExporter(int writeBlockSize) {
return new DocumentExporter(reader, documentDestination, writeBlockSize, jobContextRepository);
}
private ChunkContext chunkContext(BatchStatus batchStatus) {
StepExecution stepExecution = new StepExecution("exportDoc", new JobExecution(JOB_EXECUTION_ID));
stepExecution.setId(STEP_EXECUTION_ID);
stepExecution.getJobExecution().setStatus(batchStatus);
return new ChunkContext(new StepContext(stepExecution));
}
@After
public void tearDown() throws Exception {
verifyAll();
}
@Test
public void testNothingToRead() throws Exception {
reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(null);
reader.close(); expectLastCall();
replayAll();
documentExporter.execute(null, chunkContext);
}
private ExecutionContext executionContext(ChunkContext chunkContext) {
return chunkContext.getStepContext().getStepExecution().getExecutionContext();
}
@Test
public void testWriteLessDocumentsThanWriteBlockSize() throws Exception {
reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
expect(reader.read()).andReturn(null);
reader.close(); expectLastCall();
documentItemWriter.close(); expectLastCall();
replayAll();
assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
}
@Test
public void testWriteMoreDocumentsThanWriteBlockSize() throws Exception {
reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT_2);
documentItemWriter.write(DOCUMENT_2); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT_3);
documentItemWriter.close(); expectLastCall();
jobContextRepository.updateExecutionContext(chunkContext.getStepContext().getStepExecution());
expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter2);
documentItemWriter2.write(DOCUMENT_3); expectLastCall();
expect(reader.read()).andReturn(null);
reader.update(executionContext(chunkContext));
reader.close(); expectLastCall();
documentItemWriter2.close(); expectLastCall();
replayAll();
assertThat(documentExporter.execute(null, chunkContext), is(RepeatStatus.FINISHED));
}
@Test(expected = IOException.class)
public void testReadError() throws Exception {
reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
expect(reader.read()).andThrow(new IOException("TEST"));
documentItemWriter.revert(); expectLastCall();
reader.close(); expectLastCall();
replayAll();
documentExporter.execute(null, chunkContext);
}
@Test(expected = UncheckedIOException.class)
public void testWriteError() throws Exception {
reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall().andThrow(new UncheckedIOException(new IOException("TEST")));
documentItemWriter.revert(); expectLastCall();
reader.close(); expectLastCall();
replayAll();
documentExporter.execute(null, chunkContext);
}
@Test
public void testStopAndRestartExportsAllDocuments() throws Exception {
ChunkContext stoppingChunkContext = chunkContext(BatchStatus.STOPPING);
DocumentExporter documentExporter = documentExporter(1);
reader.open(executionContext(chunkContext)); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT);
expect(documentDestination.open(DOCUMENT)).andReturn(documentItemWriter);
documentItemWriter.write(DOCUMENT); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT_2);
expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(chunkContext.getStepContext().getStepExecution());
documentItemWriter.close(); expectLastCall();
reader.update(executionContext(this.chunkContext));
jobContextRepository.updateExecutionContext(this.chunkContext.getStepContext().getStepExecution());
expect(documentDestination.open(DOCUMENT_2)).andReturn(documentItemWriter2);
documentItemWriter2.write(DOCUMENT_2); expectLastCall();
expect(reader.read()).andReturn(DOCUMENT_3);
expect(jobContextRepository.getStepExecution(JOB_EXECUTION_ID, STEP_EXECUTION_ID)).andReturn(stoppingChunkContext.getStepContext().getStepExecution());
documentItemWriter2.revert(); expectLastCall();
reader.close(); expectLastCall();
reader.open(executionContext(chunkContext));
expect(reader.read()).andReturn(DOCUMENT_3);
expect(documentDestination.open(DOCUMENT_3)).andReturn(documentItemWriter3);
documentItemWriter3.write(DOCUMENT_3); expectLastCall();
documentItemWriter3.close(); expectLastCall();
expect(reader.read()).andReturn(null);
reader.close(); expectLastCall();
replayAll();
RepeatStatus repeatStatus = documentExporter.execute(null, this.chunkContext);
assertThat(repeatStatus, is(RepeatStatus.CONTINUABLE));
repeatStatus = documentExporter.execute(null, this.chunkContext);
assertThat(repeatStatus, is(RepeatStatus.FINISHED));
}
}