| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller.repository; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.controller.FlowFileQueue; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.StandardFlowFileQueue; |
| import org.apache.nifi.controller.repository.claim.ContentClaim; |
| import org.apache.nifi.controller.repository.claim.ContentClaimManager; |
| import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.MissingFlowFileException; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.io.InputStreamCallback; |
| import org.apache.nifi.processor.io.OutputStreamCallback; |
| import org.apache.nifi.processor.io.StreamCallback; |
| import org.apache.nifi.provenance.MockProvenanceEventRepository; |
| import org.apache.nifi.provenance.ProvenanceEventRecord; |
| import org.apache.nifi.provenance.ProvenanceEventRepository; |
| import org.apache.nifi.provenance.ProvenanceEventType; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestStandardProcessSession { |
| |
| private StandardProcessSession session; |
| private MockContentRepository contentRepo; |
| private FlowFileQueue flowFileQueue; |
| |
| private ProvenanceEventRepository provenanceRepo; |
| private MockFlowFileRepository flowFileRepo; |
| |
| @After |
| public void cleanup() { |
| session.rollback(); |
| |
| final File repoDir = new File("target/contentRepo"); |
| rmDir(repoDir); |
| } |
| |
| private void rmDir(final File dir) { |
| if (dir.isDirectory()) { |
| final File[] children = dir.listFiles(); |
| if (children != null) { |
| for (final File child : children) { |
| rmDir(child); |
| } |
| } |
| } |
| |
| boolean removed = false; |
| for (int i = 0; i < 100; i++) { |
| removed = dir.delete(); |
| if (removed) { |
| break; |
| } |
| } |
| |
| if (!removed && dir.exists()) { |
| // we fail in this situation because it generally means that the StandardProcessSession did not |
| // close the OutputStream. |
| Assert.fail("Could not clean up content repo: " + dir + " could not be removed"); |
| } |
| } |
| |
| @Before |
| @SuppressWarnings("unchecked") |
| public void setup() throws IOException { |
| System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); |
| final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class); |
| final CounterRepository counterRepo = Mockito.mock(CounterRepository.class); |
| provenanceRepo = new MockProvenanceEventRepository(); |
| |
| final Connection connection = Mockito.mock(Connection.class); |
| final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); |
| |
| flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, 10000); |
| when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); |
| |
| Mockito.doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocation) throws Throwable { |
| flowFileQueue.put((FlowFileRecord) invocation.getArguments()[0]); |
| return null; |
| } |
| }).when(connection).enqueue(Mockito.any(FlowFileRecord.class)); |
| |
| Mockito.doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocation) throws Throwable { |
| flowFileQueue.putAll((Collection<FlowFileRecord>) invocation.getArguments()[0]); |
| return null; |
| } |
| }).when(connection).enqueue(Mockito.any(Collection.class)); |
| |
| final Connectable dest = Mockito.mock(Connectable.class); |
| when(connection.getDestination()).thenReturn(dest); |
| when(connection.getSource()).thenReturn(dest); |
| |
| final List<Connection> connList = new ArrayList<>(); |
| connList.add(connection); |
| |
| final ProcessGroup procGroup = Mockito.mock(ProcessGroup.class); |
| when(procGroup.getIdentifier()).thenReturn("proc-group-identifier-1"); |
| |
| final Connectable connectable = Mockito.mock(Connectable.class); |
| when(connectable.hasIncomingConnection()).thenReturn(true); |
| when(connectable.getIncomingConnections()).thenReturn(connList); |
| when(connectable.getProcessGroup()).thenReturn(procGroup); |
| when(connectable.getIdentifier()).thenReturn("connectable-1"); |
| when(connectable.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT); |
| |
| Mockito.doAnswer(new Answer<Set<Connection>>() { |
| @Override |
| public Set<Connection> answer(final InvocationOnMock invocation) throws Throwable { |
| final Object[] arguments = invocation.getArguments(); |
| final Relationship relationship = (Relationship) arguments[0]; |
| if (relationship == Relationship.SELF) { |
| return Collections.emptySet(); |
| } else { |
| return new HashSet<>(connList); |
| } |
| } |
| }).when(connectable).getConnections(Mockito.any(Relationship.class)); |
| when(connectable.getConnections()).thenReturn(new HashSet<>(connList)); |
| |
| contentRepo = new MockContentRepository(); |
| contentRepo.initialize(new StandardContentClaimManager()); |
| flowFileRepo = new MockFlowFileRepository(); |
| |
| final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo); |
| session = new StandardProcessSession(context); |
| } |
| |
| @Test |
| public void testModifyContentThenRollback() throws IOException { |
| assertEquals(0, contentRepo.getExistingClaims().size()); |
| |
| final ContentClaim claim = contentRepo.create(false); |
| assertEquals(1, contentRepo.getExistingClaims().size()); |
| |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .contentClaim(claim) |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| flowFileQueue.put(flowFileRecord); |
| |
| FlowFile flowFile = session.get(); |
| assertNotNull(flowFile); |
| flowFile = session.putAttribute(flowFile, "filename", "1.txt"); |
| flowFile = session.write(flowFile, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| } |
| }); |
| session.transfer(flowFile); |
| session.commit(); |
| assertEquals(1, contentRepo.getExistingClaims().size()); |
| |
| flowFile = session.get(); |
| assertNotNull(flowFile); |
| flowFile = session.write(flowFile, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| } |
| }); |
| session.transfer(flowFile); |
| session.commit(); |
| |
| assertEquals(1, contentRepo.getExistingClaims().size()); |
| |
| flowFile = session.get(); |
| assertNotNull(flowFile); |
| session.remove(flowFile); |
| session.rollback(); |
| assertEquals(1, contentRepo.getExistingClaims().size()); |
| |
| flowFile = session.get(); |
| assertNotNull(flowFile); |
| session.remove(flowFile); |
| session.commit(); |
| assertEquals(0, contentRepo.getExistingClaims().size()); |
| } |
| |
| @Test |
| public void testCreateThenRollbackRemovesContent() throws IOException { |
| |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| flowFileQueue.put(flowFileRecord); |
| |
| final StreamCallback nop = new StreamCallback() { |
| @Override |
| public void process(InputStream in, OutputStream out) throws IOException { |
| } |
| }; |
| |
| session.create(); |
| FlowFile flowFile = session.create(flowFileRecord); |
| flowFile = session.write(flowFile, nop); |
| |
| FlowFile flowFile2 = session.create(flowFileRecord); |
| flowFile2 = session.write(flowFile2, nop); |
| |
| session.write(flowFile2, nop); |
| |
| FlowFile flowFile3 = session.create(); |
| session.write(flowFile3, nop); |
| |
| session.rollback(); |
| assertEquals(4, contentRepo.getClaimsRemoved()); |
| } |
| |
| @Test |
| public void testForksNotEmittedIfFilesDeleted() throws IOException { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| |
| flowFileQueue.put(flowFileRecord); |
| |
| FlowFile orig = session.get(); |
| FlowFile newFlowFile = session.create(orig); |
| session.remove(newFlowFile); |
| session.commit(); |
| |
| assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); |
| } |
| |
| |
| @Test |
| public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| |
| flowFileQueue.put(flowFileRecord); |
| |
| FlowFile orig = session.get(); |
| FlowFile newFlowFile = session.create(orig); |
| session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); |
| session.commit(); |
| |
| assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); // 1 event for both parents and children |
| } |
| |
| @Test |
| public void testProvenanceEventsEmittedForRemove() throws IOException { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| |
| flowFileQueue.put(flowFileRecord); |
| |
| FlowFile orig = session.get(); |
| FlowFile newFlowFile = session.create(orig); |
| FlowFile secondNewFlowFile = session.create(orig); |
| session.remove(newFlowFile); |
| session.transfer(secondNewFlowFile, new Relationship.Builder().name("A").build()); |
| session.commit(); |
| |
| assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); |
| } |
| |
| @Test |
| public void testUpdateAttributesThenJoin() throws IOException { |
| final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() |
| .id(1L) |
| .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| |
| final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() |
| .id(2L) |
| .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| |
| flowFileQueue.put(flowFileRecord1); |
| flowFileQueue.put(flowFileRecord2); |
| |
| FlowFile ff1 = session.get(); |
| FlowFile ff2 = session.get(); |
| |
| ff1 = session.putAttribute(ff1, "index", "1"); |
| ff2 = session.putAttribute(ff2, "index", "2"); |
| |
| final List<FlowFile> parents = new ArrayList<>(2); |
| parents.add(ff1); |
| parents.add(ff2); |
| |
| final FlowFile child = session.create(parents); |
| |
| final Relationship rel = new Relationship.Builder().name("A").build(); |
| |
| session.transfer(ff1, rel); |
| session.transfer(ff2, rel); |
| session.transfer(child, rel); |
| |
| session.commit(); |
| |
| final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000); |
| |
| // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's |
| assertEquals(3, events.size()); |
| |
| int joinCount = 0; |
| int ff1UpdateCount = 0; |
| int ff2UpdateCount = 0; |
| |
| for ( final ProvenanceEventRecord event : events ) { |
| switch (event.getEventType()) { |
| case JOIN: |
| assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid()); |
| joinCount++; |
| break; |
| case ATTRIBUTES_MODIFIED: |
| if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) { |
| ff1UpdateCount++; |
| } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) { |
| ff2UpdateCount++; |
| } else { |
| Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid()); |
| } |
| break; |
| default: |
| Assert.fail("Unexpected event type: " + event); |
| } |
| } |
| |
| assertEquals(1, joinCount); |
| assertEquals(1, ff1UpdateCount); |
| assertEquals(1, ff2UpdateCount); |
| |
| assertEquals(1, joinCount); |
| } |
| |
| @Test |
| public void testForkOneToOneReported() throws IOException { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .build(); |
| |
| flowFileQueue.put(flowFileRecord); |
| |
| // we have to increment the ID generator because we are creating a FlowFile without the FlowFile Repository's knowledge |
| flowFileRepo.idGenerator.getAndIncrement(); |
| |
| FlowFile orig = session.get(); |
| FlowFile newFlowFile = session.create(orig); |
| session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); |
| session.getProvenanceReporter().fork(newFlowFile, Collections.singleton(orig)); |
| session.remove(orig); |
| session.commit(); |
| |
| final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000); |
| assertEquals(2, events.size()); |
| |
| final ProvenanceEventRecord firstRecord = events.get(0); |
| final ProvenanceEventRecord secondRecord = events.get(1); |
| |
| assertEquals(ProvenanceEventType.FORK, firstRecord.getEventType()); |
| assertEquals(ProvenanceEventType.DROP, secondRecord.getEventType()); |
| } |
| |
| @Test |
| public void testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() { |
| FlowFile ff1 = session.create(); |
| |
| final RuntimeException runtime = new RuntimeException(); |
| try { |
| session.write(ff1, new OutputStreamCallback() { |
| @Override |
| public void process(final OutputStream out) throws IOException { |
| throw runtime; |
| } |
| }); |
| Assert.fail("Should have thrown RuntimeException"); |
| } catch (final RuntimeException re) { |
| assertTrue(runtime == re); |
| } |
| |
| final IOException ioe = new IOException(); |
| try { |
| session.write(ff1, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| throw ioe; |
| } |
| }); |
| Assert.fail("Should have thrown ProcessException"); |
| } catch (final ProcessException pe) { |
| assertTrue(ioe == pe.getCause()); |
| } |
| |
| final ProcessException pe = new ProcessException(); |
| try { |
| session.write(ff1, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| throw pe; |
| } |
| }); |
| Assert.fail("Should have thrown ProcessException"); |
| } catch (final ProcessException pe2) { |
| assertTrue(pe == pe2); |
| } |
| } |
| |
| @Test |
| public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() { |
| FlowFile ff1 = session.create(); |
| |
| final RuntimeException runtime = new RuntimeException(); |
| try { |
| session.write(ff1, new StreamCallback() { |
| @Override |
| public void process(final InputStream in, final OutputStream out) throws IOException { |
| throw runtime; |
| } |
| }); |
| Assert.fail("Should have thrown RuntimeException"); |
| } catch (final RuntimeException re) { |
| assertTrue(runtime == re); |
| } |
| |
| final IOException ioe = new IOException(); |
| try { |
| session.write(ff1, new StreamCallback() { |
| @Override |
| public void process(final InputStream in, OutputStream out) throws IOException { |
| throw ioe; |
| } |
| }); |
| Assert.fail("Should have thrown ProcessException"); |
| } catch (final ProcessException pe) { |
| assertTrue(ioe == pe.getCause()); |
| } |
| |
| final ProcessException pe = new ProcessException(); |
| try { |
| session.write(ff1, new StreamCallback() { |
| @Override |
| public void process(final InputStream in, OutputStream out) throws IOException { |
| throw pe; |
| } |
| }); |
| Assert.fail("Should have thrown ProcessException"); |
| } catch (final ProcessException pe2) { |
| assertTrue(pe == pe2); |
| } |
| } |
| |
| @Test |
| public void testMissingFlowFileExceptionThrownWhenUnableToReadData() { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .contentClaim(new ContentClaim() { |
| @Override |
| public int compareTo(ContentClaim arg0) { |
| return 0; |
| } |
| |
| @Override |
| public String getId() { |
| return "0"; |
| } |
| |
| @Override |
| public String getContainer() { |
| return "x"; |
| } |
| |
| @Override |
| public String getSection() { |
| return "x"; |
| } |
| |
| @Override |
| public boolean isLossTolerant() { |
| return true; |
| } |
| }) |
| .size(1L) |
| .build(); |
| flowFileQueue.put(flowFileRecord); |
| |
| // attempt to read the data. |
| try { |
| FlowFile ff1 = session.get(); |
| |
| session.read(ff1, new InputStreamCallback() { |
| @Override |
| public void process(InputStream in) throws IOException { |
| } |
| }); |
| Assert.fail("Expected MissingFlowFileException"); |
| } catch (final MissingFlowFileException mffe) { |
| } |
| } |
| |
| @Test |
| public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .contentClaim(new ContentClaim() { |
| @Override |
| public int compareTo(ContentClaim arg0) { |
| return 0; |
| } |
| |
| @Override |
| public String getId() { |
| return "0"; |
| } |
| |
| @Override |
| public String getContainer() { |
| return "x"; |
| } |
| |
| @Override |
| public String getSection() { |
| return "x"; |
| } |
| |
| @Override |
| public boolean isLossTolerant() { |
| return true; |
| } |
| }) |
| .size(1L) |
| .build(); |
| flowFileQueue.put(flowFileRecord); |
| |
| // attempt to read the data. |
| try { |
| FlowFile ff1 = session.get(); |
| |
| session.write(ff1, new StreamCallback() { |
| @Override |
| public void process(InputStream in, OutputStream out) throws IOException { |
| } |
| }); |
| Assert.fail("Expected MissingFlowFileException"); |
| } catch (final MissingFlowFileException mffe) { |
| } |
| } |
| |
| @Test |
| public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .contentClaim(new ContentClaim() { |
| @Override |
| public int compareTo(ContentClaim arg0) { |
| return 0; |
| } |
| |
| @Override |
| public String getId() { |
| return "0"; |
| } |
| |
| @Override |
| public String getContainer() { |
| return "container"; |
| } |
| |
| @Override |
| public String getSection() { |
| return "section"; |
| } |
| |
| @Override |
| public boolean isLossTolerant() { |
| return true; |
| } |
| }) |
| .build(); |
| flowFileQueue.put(flowFileRecord); |
| |
| FlowFile ff1 = session.get(); |
| ff1 = session.write(ff1, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| } |
| }); |
| session.transfer(ff1); |
| session.commit(); |
| |
| final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .contentClaim(new ContentClaim() { |
| @Override |
| public int compareTo(ContentClaim arg0) { |
| return 0; |
| } |
| |
| @Override |
| public String getId() { |
| return "0"; |
| } |
| |
| @Override |
| public String getContainer() { |
| return "container"; |
| } |
| |
| @Override |
| public String getSection() { |
| return "section"; |
| } |
| |
| @Override |
| public boolean isLossTolerant() { |
| return true; |
| } |
| }) |
| .contentClaimOffset(1000L) |
| .size(1000L) |
| .build(); |
| flowFileQueue.put(flowFileRecord2); |
| |
| // attempt to read the data. |
| try { |
| session.get(); |
| FlowFile ff2 = session.get(); |
| session.write(ff2, new StreamCallback() { |
| @Override |
| public void process(InputStream in, OutputStream out) throws IOException { |
| } |
| }); |
| Assert.fail("Expected ContentNotFoundException"); |
| } catch (final MissingFlowFileException mffe) { |
| } |
| } |
| |
| @Test |
| public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .contentClaim(new ContentClaim() { |
| @Override |
| public int compareTo(ContentClaim arg0) { |
| return 0; |
| } |
| |
| @Override |
| public String getId() { |
| return "0"; |
| } |
| |
| @Override |
| public String getContainer() { |
| return "container"; |
| } |
| |
| @Override |
| public String getSection() { |
| return "section"; |
| } |
| |
| @Override |
| public boolean isLossTolerant() { |
| return true; |
| } |
| }).build(); |
| flowFileQueue.put(flowFileRecord); |
| |
| FlowFile ff1 = session.get(); |
| ff1 = session.write(ff1, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| } |
| }); |
| session.transfer(ff1); |
| session.commit(); |
| |
| final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() |
| .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") |
| .entryDate(System.currentTimeMillis()) |
| .contentClaim(new ContentClaim() { |
| @Override |
| public int compareTo(ContentClaim arg0) { |
| return 0; |
| } |
| |
| @Override |
| public String getId() { |
| return "0"; |
| } |
| |
| @Override |
| public String getContainer() { |
| return "container"; |
| } |
| |
| @Override |
| public String getSection() { |
| return "section"; |
| } |
| |
| @Override |
| public boolean isLossTolerant() { |
| return true; |
| } |
| }) |
| .contentClaimOffset(1000L).size(1L).build(); |
| flowFileQueue.put(flowFileRecord2); |
| |
| // attempt to read the data. |
| try { |
| session.get(); |
| FlowFile ff2 = session.get(); |
| session.read(ff2, new InputStreamCallback() { |
| @Override |
| public void process(InputStream in) throws IOException { |
| } |
| }); |
| Assert.fail("Expected MissingFlowFileException"); |
| } catch (final MissingFlowFileException mffe) { |
| } |
| } |
| |
| @Test |
| public void testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() { |
| FlowFile ff1 = session.create(); |
| |
| final RuntimeException runtime = new RuntimeException(); |
| try { |
| session.read(ff1, new InputStreamCallback() { |
| @Override |
| public void process(final InputStream in) throws IOException { |
| throw runtime; |
| } |
| }); |
| Assert.fail("Should have thrown RuntimeException"); |
| } catch (final RuntimeException re) { |
| assertTrue(runtime == re); |
| } |
| |
| final IOException ioe = new IOException(); |
| try { |
| session.read(ff1, new InputStreamCallback() { |
| @Override |
| public void process(final InputStream in) throws IOException { |
| throw ioe; |
| } |
| }); |
| Assert.fail("Should have thrown ProcessException"); |
| } catch (final ProcessException pe) { |
| assertTrue(ioe == pe.getCause()); |
| } |
| |
| final ProcessException pe = new ProcessException(); |
| try { |
| session.read(ff1, new InputStreamCallback() { |
| @Override |
| public void process(final InputStream in) throws IOException { |
| throw pe; |
| } |
| }); |
| Assert.fail("Should have thrown ProcessException"); |
| } catch (final ProcessException pe2) { |
| assertTrue(pe == pe2); |
| } |
| } |
| |
| |
| @Test |
| public void testCreateEmitted() throws IOException { |
| FlowFile newFlowFile = session.create(); |
| session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); |
| session.commit(); |
| |
| final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); |
| assertFalse(events.isEmpty()); |
| assertEquals(1, events.size()); |
| |
| final ProvenanceEventRecord event = events.get(0); |
| assertEquals(ProvenanceEventType.CREATE, event.getEventType()); |
| } |
| |
| @Test |
| public void testContentModifiedNotEmittedForCreate() throws IOException { |
| FlowFile newFlowFile = session.create(); |
| newFlowFile = session.write(newFlowFile, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| } |
| }); |
| session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); |
| session.commit(); |
| |
| final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); |
| assertFalse(events.isEmpty()); |
| assertEquals(1, events.size()); |
| |
| final ProvenanceEventRecord event = events.get(0); |
| assertEquals(ProvenanceEventType.CREATE, event.getEventType()); |
| } |
| |
| @Test |
| public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { |
| final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() |
| .id(1L) |
| .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") |
| .build(); |
| this.flowFileQueue.put(flowFile); |
| |
| FlowFile existingFlowFile = session.get(); |
| existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() { |
| @Override |
| public void process(OutputStream out) throws IOException { |
| } |
| }); |
| existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); |
| session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); |
| session.commit(); |
| |
| final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); |
| assertFalse(events.isEmpty()); |
| assertEquals(1, events.size()); |
| |
| final ProvenanceEventRecord event = events.get(0); |
| assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType()); |
| } |
| |
| @Test |
| public void testAttributesModifiedEmitted() throws IOException { |
| final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() |
| .id(1L) |
| .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") |
| .build(); |
| this.flowFileQueue.put(flowFile); |
| |
| FlowFile existingFlowFile = session.get(); |
| existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); |
| session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); |
| session.commit(); |
| |
| final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); |
| assertFalse(events.isEmpty()); |
| assertEquals(1, events.size()); |
| |
| final ProvenanceEventRecord event = events.get(0); |
| assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); |
| } |
| |
| |
| |
| private static class MockFlowFileRepository implements FlowFileRepository { |
| |
| private final AtomicLong idGenerator = new AtomicLong(0L); |
| |
| @Override |
| public void close() throws IOException { |
| } |
| |
| @Override |
| public long getNextFlowFileSequence() { |
| return idGenerator.getAndIncrement(); |
| } |
| |
| @Override |
| public long getMaxFlowFileIdentifier() throws IOException { |
| return 0L; |
| } |
| |
| @Override |
| public void updateRepository(Collection<RepositoryRecord> records) throws IOException { |
| } |
| |
| @Override |
| public long getStorageCapacity() throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public long getUsableStorageSpace() throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public boolean isVolatile() { |
| return false; |
| } |
| |
| @Override |
| public long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> flowFileRecords, FlowFileQueue queue) throws IOException { |
| } |
| |
| @Override |
| public void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue queue, String swapLocation) throws IOException { |
| } |
| |
| @Override |
| public void initialize(ContentClaimManager claimManager) throws IOException { |
| } |
| } |
| |
| private static class MockContentRepository implements ContentRepository { |
| |
| private final AtomicLong idGenerator = new AtomicLong(0L); |
| private final AtomicLong claimsRemoved = new AtomicLong(0L); |
| private ContentClaimManager claimManager; |
| |
| private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); |
| |
| @Override |
| public void shutdown() { |
| } |
| |
| public Set<ContentClaim> getExistingClaims() { |
| final Set<ContentClaim> claims = new HashSet<>(); |
| |
| for (long i = 0; i < idGenerator.get(); i++) { |
| final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(i), false); |
| if (getClaimantCount(claim) > 0) { |
| claims.add(claim); |
| } |
| } |
| |
| return claims; |
| } |
| |
| @Override |
| public ContentClaim create(boolean lossTolerant) throws IOException { |
| final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); |
| claimantCounts.put(claim, new AtomicInteger(1)); |
| return claim; |
| } |
| |
| @Override |
| public int incrementClaimaintCount(ContentClaim claim) { |
| final AtomicInteger count = claimantCounts.get(claim); |
| if (count == null) { |
| throw new IllegalArgumentException("Unknown Claim: " + claim); |
| } |
| return count.incrementAndGet(); |
| } |
| |
| @Override |
| public int getClaimantCount(ContentClaim claim) { |
| final AtomicInteger count = claimantCounts.get(claim); |
| if (count == null) { |
| throw new IllegalArgumentException("Unknown Claim: " + claim); |
| } |
| return count.get(); |
| } |
| |
| public long getClaimsRemoved() { |
| return claimsRemoved.get(); |
| } |
| |
| @Override |
| public long getContainerCapacity(String containerName) throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public Set<String> getContainerNames() { |
| return new HashSet<String>(); |
| } |
| |
| @Override |
| public long getContainerUsableSpace(String containerName) throws IOException { |
| return 0; |
| } |
| |
| @Override |
| public int decrementClaimantCount(ContentClaim claim) { |
| if (claim == null) { |
| return 0; |
| } |
| |
| final AtomicInteger count = claimantCounts.get(claim); |
| if (count == null) { |
| return 0; |
| } |
| final int newClaimantCount = count.decrementAndGet(); |
| |
| if (newClaimantCount < 0) { |
| throw new IllegalStateException("Content Claim removed, resulting in a claimant count of " + newClaimantCount + " for " + claim); |
| } |
| |
| claimsRemoved.getAndIncrement(); |
| |
| return newClaimantCount; |
| } |
| |
| @Override |
| public boolean remove(ContentClaim claim) { |
| return true; |
| } |
| |
| @Override |
| public ContentClaim clone(ContentClaim original, boolean lossTolerant) throws IOException { |
| return null; |
| } |
| |
| @Override |
| public long merge(Collection<ContentClaim> claims, ContentClaim destination, byte[] header, byte[] footer, byte[] demarcator) throws IOException { |
| return 0; |
| } |
| |
| private Path getPath(final ContentClaim claim) { |
| return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId()); |
| } |
| |
| @Override |
| public long importFrom(Path content, ContentClaim claim) throws IOException { |
| Files.copy(content, getPath(claim)); |
| return Files.size(content); |
| } |
| |
| @Override |
| public long importFrom(Path content, ContentClaim claim, boolean append) throws IOException { |
| if (append) { |
| throw new UnsupportedOperationException(); |
| } |
| return importFrom(content, claim); |
| } |
| |
| @Override |
| public long importFrom(InputStream content, ContentClaim claim) throws IOException { |
| Files.copy(content, getPath(claim)); |
| return Files.size(getPath(claim)); |
| } |
| |
| @Override |
| public long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException { |
| if (append) { |
| throw new UnsupportedOperationException(); |
| } |
| return importFrom(content, claim); |
| } |
| |
| @Override |
| public long exportTo(ContentClaim claim, Path destination, boolean append) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long exportTo(ContentClaim claim, Path destination, boolean append, long offset, long length) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long exportTo(ContentClaim claim, OutputStream destination) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long size(ContentClaim claim) throws IOException { |
| return Files.size(getPath(claim)); |
| } |
| |
| @Override |
| public InputStream read(ContentClaim claim) throws IOException { |
| if (claim == null) { |
| return new ByteArrayInputStream(new byte[0]); |
| } |
| |
| try { |
| return new FileInputStream(getPath(claim).toFile()); |
| } catch (final FileNotFoundException fnfe) { |
| throw new ContentNotFoundException(claim, fnfe); |
| } |
| } |
| |
| @Override |
| public OutputStream write(ContentClaim claim) throws IOException { |
| final Path path = getPath(claim); |
| final File file = path.toFile(); |
| final File parentFile = file.getParentFile(); |
| |
| if (!parentFile.exists() && !parentFile.mkdirs()) { |
| throw new IOException("Unable to create directory " + parentFile.getAbsolutePath()); |
| } |
| return new FileOutputStream(file); |
| } |
| |
| @Override |
| public void purge() { |
| } |
| |
| @Override |
| public void cleanup() { |
| } |
| |
| @Override |
| public boolean isAccessible(ContentClaim contentClaim) throws IOException { |
| return true; |
| } |
| |
| @Override |
| public void initialize(ContentClaimManager claimManager) throws IOException { |
| this.claimManager = claimManager; |
| } |
| } |
| } |