blob: 91781a1200b784da8d10ffd94e6b0bef48b021fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class GetHDFSTest {
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@BeforeClass
public static void setUpSuite() {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
}
@Before
public void setup() {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
}
@Test
public void getPathDifferenceTest() {
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("/root"), new Path("/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("/root"), new Path("/root/file")));
Assert.assertEquals("one", GetHDFS.getPathDifference(new Path("/root"), new Path("/root/one/file")));
Assert.assertEquals("one/two", GetHDFS.getPathDifference(new Path("/root"), new Path("/root/one/two/file")));
Assert.assertEquals("one/two/three", GetHDFS.getPathDifference(new Path("/root"), new Path("/root/one/two/three/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("root"), new Path("/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("root"), new Path("/root/file")));
Assert.assertEquals("one", GetHDFS.getPathDifference(new Path("root"), new Path("/root/one/file")));
Assert.assertEquals("one/two", GetHDFS.getPathDifference(new Path("root"), new Path("/root/one/two/file")));
Assert.assertEquals("one/two/three", GetHDFS.getPathDifference(new Path("root"), new Path("/base/root/one/two/three/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("/foo/bar"), new Path("/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("/foo/bar"), new Path("/foo/bar/file")));
Assert.assertEquals("one", GetHDFS.getPathDifference(new Path("/foo/bar"), new Path("/foo/bar/one/file")));
Assert.assertEquals("one/two", GetHDFS.getPathDifference(new Path("/foo/bar"), new Path("/foo/bar/one/two/file")));
Assert.assertEquals("one/two/three", GetHDFS.getPathDifference(new Path("/foo/bar"), new Path("/foo/bar/one/two/three/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("foo/bar"), new Path("/file")));
Assert.assertEquals("", GetHDFS.getPathDifference(new Path("foo/bar"), new Path("/foo/bar/file")));
Assert.assertEquals("one", GetHDFS.getPathDifference(new Path("foo/bar"), new Path("/foo/bar/one/file")));
Assert.assertEquals("one/two", GetHDFS.getPathDifference(new Path("foo/bar"), new Path("/foo/bar/one/two/file")));
Assert.assertEquals("one/two/three", GetHDFS.getPathDifference(new Path("foo/bar"), new Path("/base/foo/bar/one/two/three/file")));
Assert.assertEquals("one/two/three", GetHDFS.getPathDifference(new Path("foo/bar"), new Path("/base/base2/base3/foo/bar/one/two/three/file")));
}
@Test
public void testValidators() {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
Collection<ValidationResult> results;
ProcessContext pc;
results = new HashSet<>();
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Directory is required"));
}
results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "target");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
results = new HashSet<>();
runner.setProperty(GetHDFS.DIRECTORY, "/target");
runner.setProperty(GetHDFS.MIN_AGE, "10 secs");
runner.setProperty(GetHDFS.MAX_AGE, "5 secs");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age"));
}
}
@Test
public void testGetFilesWithFilter() {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(4, flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("random"));
}
}
@Test
public void testDirectoryDoesNotExist() {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "does/not/exist/${now():format('yyyyMMdd')}");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(0, flowFiles.size());
}
@Test
public void testAutomaticDecompression() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
@Test
public void testInferCompressionCodecDisabled() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@Test
public void testFileExtensionNotACompressionCodec() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
@Test
public void testDirectoryUsesValidEL() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/${literal('testdata'):substring(0,8)}");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord receiveEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, receiveEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(receiveEvent.getTransitUri().endsWith("13545423550275052.zip"));
}
@Test
public void testDirectoryUsesUnrecognizedEL() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.assertNotValid();
}
@Test
public void testDirectoryUsesInvalidEL() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):foo()}");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.assertNotValid();
}
@Test
public void testDirectoryCheckWrappedInUGICallWhenDirectoryExists() throws IOException, InterruptedException {
// GIVEN, WHEN
boolean directoryExists = true;
// THEN
directoryExistsWrappedInUGICall(directoryExists);
}
@Test
public void testDirectoryCheckWrappedInUGICallWhenDirectoryDoesNotExist() throws IOException, InterruptedException {
// GIVEN, WHEN
boolean directoryExists = false;
// THEN
directoryExistsWrappedInUGICall(directoryExists);
}
private void directoryExistsWrappedInUGICall(boolean directoryExists) throws IOException, InterruptedException {
// GIVEN
FileSystem mockFileSystem = mock(FileSystem.class);
UserGroupInformation mockUserGroupInformation = mock(UserGroupInformation.class);
GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, mockFileSystem, mockUserGroupInformation);
TestRunner runner = TestRunners.newTestRunner(testSubject);
runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata");
// WHEN
Answer<?> answer = new Answer<Object>() {
private int callCounter = 0;
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
final Object result;
if (callCounter == 0) {
when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists);
result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
verify(mockFileSystem).exists(any(Path.class));
} else {
when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]);
result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
verify(mockFileSystem).listStatus(any(Path.class));
}
++callCounter;
return result;
}
};
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(answer);
runner.run();
// THEN
verify(mockFileSystem).getUri();
verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation);
}
private static class TestableGetHDFS extends GetHDFS {
private final KerberosProperties testKerberosProperties;
public TestableGetHDFS(KerberosProperties testKerberosProperties) {
this.testKerberosProperties = testKerberosProperties;
}
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerberosProperties;
}
}
private static class TestableGetHDFSForUGI extends TestableGetHDFS {
private FileSystem mockFileSystem;
private UserGroupInformation mockUserGroupInformation;
public TestableGetHDFSForUGI(KerberosProperties testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation mockUserGroupInformation) {
super(testKerberosProperties);
this.mockFileSystem = mockFileSystem;
this.mockUserGroupInformation = mockUserGroupInformation;
}
@Override
protected FileSystem getFileSystem() {
return mockFileSystem;
}
@Override
protected UserGroupInformation getUserGroupInformation() {
return mockUserGroupInformation;
}
}
}