blob: 7ee30a7c0054b2014e09e4fe22301f68e72c68b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.junit.jupiter.api.Assertions;
public class MockFlowFile implements FlowFileRecord {
private final Map<String, String> attributes = new HashMap<>();
private final long id;
private final long entryDate;
private final long creationTime;
private boolean penalized = false;
private byte[] data = new byte[0];
private long lastEnqueuedDate = 0;
private long enqueuedIndex = 0;
public MockFlowFile(final long id) {
this.creationTime = System.nanoTime();
this.id = id;
entryDate = System.currentTimeMillis();
lastEnqueuedDate = entryDate;
attributes.put(CoreAttributes.FILENAME.key(), System.nanoTime() + ".mockFlowFile");
attributes.put(CoreAttributes.PATH.key(), "target");
final String uuid = UUID.randomUUID().toString();
attributes.put(CoreAttributes.UUID.key(), uuid);
}
public MockFlowFile(final long id, final FlowFile toCopy) {
this.creationTime = System.nanoTime();
this.id = id;
entryDate = System.currentTimeMillis();
final Map<String, String> attributesToCopy = toCopy.getAttributes();
String filename = attributesToCopy.get(CoreAttributes.FILENAME.key());
if (filename == null) {
filename = System.nanoTime() + ".mockFlowFile";
}
attributes.put(CoreAttributes.FILENAME.key(), filename);
String path = attributesToCopy.get(CoreAttributes.PATH.key());
if (path == null) {
path = "target";
}
attributes.put(CoreAttributes.PATH.key(), path);
String uuid = attributesToCopy.get(CoreAttributes.UUID.key());
if (uuid == null) {
uuid = UUID.randomUUID().toString();
}
attributes.put(CoreAttributes.UUID.key(), uuid);
attributes.putAll(toCopy.getAttributes());
final byte[] dataToCopy = ((MockFlowFile) toCopy).data;
this.data = new byte[dataToCopy.length];
System.arraycopy(dataToCopy, 0, this.data, 0, dataToCopy.length);
this.penalized = toCopy.isPenalized();
}
void setPenalized(boolean penalized) {
this.penalized = penalized;
}
public long getCreationTime() {
return creationTime;
}
@Override
public long getLineageStartDate() {
return entryDate;
}
@Override
public int compareTo(final FlowFile o) {
return getAttribute(CoreAttributes.UUID.key()).compareTo(o.getAttribute(CoreAttributes.UUID.key()));
}
@Override
public String getAttribute(final String attrName) {
return attributes.get(attrName);
}
@Override
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
@Override
public long getEntryDate() {
return entryDate;
}
@Override
public long getId() {
return id;
}
@Override
public long getSize() {
return data.length;
}
/**
* Sets the value of the internal content for this mock flowfile. Would not exist in standard {@link FlowFile} implementations, but useful for complex test assertions.
*
* @param data the flowfile content as a byte[]
*/
public void setData(final byte[] data) {
this.data = data;
}
/**
* Returns the value of the internal content for this mock flowfile. Would not exist in standard {@link FlowFile} implementations, but useful for complex test assertions.
*
* @return the internal flowfile content as a byte[]
*/
public byte[] getData() {
return this.data;
}
/**
* Returns the value of the internal content for this mock flowfile as a UTF-8 encoded String. Would not exist in standard {@link FlowFile} implementations, but useful for complex test assertions.
*
* @return the internal flowfile content as a String
*/
public String getContent() {
return new String(getData(), StandardCharsets.UTF_8);
}
/**
* Returns the value of the internal content for this mock flowfile as an {@link InputStream}. Would not exist in standard {@link FlowFile} implementations, but useful for complex test assertions.
*
* @return the internal flowfile content as a stream
*/
public InputStream getContentStream() {
return new ByteArrayInputStream(getData());
}
@Override
public boolean isPenalized() {
return penalized;
}
public void putAttributes(final Map<String, String> attrs) {
attributes.putAll(attrs);
}
public void removeAttributes(final Set<String> attrNames) {
for (final String attrName : attrNames) {
attributes.remove(attrName);
}
}
@Override
public String toString() {
return "FlowFile[" + id + "," + getAttribute(CoreAttributes.FILENAME.key()) + "," + getSize() + "B]";
}
@Override
public int hashCode() {
return new HashCodeBuilder(7, 13).append(id).toHashCode();
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj instanceof FlowFile) {
return ((FlowFile) obj).getId() == this.id;
}
return false;
}
public void assertAttributeExists(final String attributeName) {
Assertions.assertTrue(attributes.containsKey(attributeName), "Attribute " + attributeName + " does not exist");
}
public void assertAttributeNotExists(final String attributeName) {
Assertions.assertFalse(attributes.containsKey(attributeName), "Attribute " + attributeName + " should not exist on FlowFile, but exists with value "
+ attributes.get(attributeName));
}
public void assertAttributeEquals(final String attributeName, final String expectedValue) {
Assertions.assertEquals(expectedValue, attributes.get(attributeName), "Expected attribute " + attributeName + " to be " +
expectedValue + " but instead it was " + attributes.get(attributeName));
}
public void assertAttributeNotEquals(final String attributeName, final String expectedValue) {
Assertions.assertNotSame(expectedValue, attributes.get(attributeName));
}
/**
* Asserts that the content of this FlowFile is the same as the content of
* the given file
*
* @param file to compare content against
* @throws IOException if fails doing IO during comparison
*/
public void assertContentEquals(final File file) throws IOException {
assertContentEquals(file.toPath());
}
/**
* Asserts that the content of this FlowFile is the same as the content of
* the given path
*
* @param path where to find content to compare to
* @throws IOException if io error occurs while comparing content
*/
public void assertContentEquals(final Path path) throws IOException {
try (final InputStream in = Files.newInputStream(path, StandardOpenOption.READ)) {
assertContentEquals(in);
}
}
/**
* Asserts that the content of this FlowFile is the same as the content of
* the given byte array
*
* @param data the data to compare
* @throws IOException if any ioe occurs while reading flowfile
*/
public void assertContentEquals(final byte[] data) throws IOException {
try (final InputStream in = new ByteArrayInputStream(data)) {
assertContentEquals(in);
}
}
public void assertContentEquals(final String data) {
assertContentEquals(data, "UTF-8");
}
public void assertContentEquals(final String data, final String charset) {
assertContentEquals(data, Charset.forName(charset));
}
public void assertContentEquals(final String data, final Charset charset) {
final String value = new String(this.data, charset);
Assertions.assertEquals(data, value);
}
/**
* Asserts that the content of this FlowFile is the same as the content of
* the given InputStream. This method closes the InputStream when it is
* finished.
*
* @param in the stream to source comparison data from
* @throws IOException if any issues reading from given source
*/
public void assertContentEquals(final InputStream in) throws IOException {
int bytesRead = 0;
try (final BufferedInputStream buffered = new BufferedInputStream(in)) {
for (int i = 0; i < data.length; i++) {
final int fromStream = buffered.read();
if (fromStream < 0) {
Assertions.fail("FlowFile content is " + data.length + " bytes but provided input is only " + bytesRead + " bytes");
}
if ((fromStream & 0xFF) != (data[i] & 0xFF)) {
Assertions.fail("FlowFile content differs from input at byte " + bytesRead + " with input having value "
+ (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF));
}
bytesRead++;
}
final int nextByte = buffered.read();
if (nextByte >= 0) {
Assertions.fail("Contents of input and FlowFile were the same through byte " + data.length + "; however, FlowFile's content ended at this point, and input has more data");
}
}
}
/**
* @return a copy of the the contents of the FlowFile as a byte array
*/
public byte[] toByteArray() {
return Arrays.copyOf(this.data, this.data.length);
}
@Override
public Long getLastQueueDate() {
return lastEnqueuedDate;
}
public void setLastEnqueuedDate(long lastEnqueuedDate) {
this.lastEnqueuedDate = lastEnqueuedDate;
}
@Override
public long getPenaltyExpirationMillis() {
return -1;
}
@Override
public ContentClaim getContentClaim() {
return null;
}
@Override
public long getContentClaimOffset() {
return 0;
}
@Override
public long getLineageStartIndex() {
return 0;
}
@Override
public long getQueueDateIndex() {
return enqueuedIndex;
}
public void setEnqueuedIndex(long enqueuedIndex) {
this.enqueuedIndex = enqueuedIndex;
}
public boolean isAttributeEqual(final String attributeName, final String expectedValue) {
// unknown attribute name, so cannot be equal.
if (attributes.containsKey(attributeName) == false) {
return false;
}
String value = attributes.get(attributeName);
return Objects.equals(expectedValue, value);
}
public boolean isContentEqual(String expected) {
return isContentEqual(expected, StandardCharsets.UTF_8);
}
public boolean isContentEqual(String expected, final Charset charset) {
final String value = new String(this.data, charset);
return Objects.equals(expected, value);
}
public boolean isContentEqual(final byte[] expected) {
return Arrays.equals(expected, this.data);
}
}