blob: 5b3da7ba96a7a9df5c821d5cc36df7a7d0ef2d24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.writer.hdfs;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.task.TopologyContext;
import org.json.simple.JSONObject;
import org.junit.Rule;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
// Suppress ConstantConditions to avoid NPE warnings that only would occur on test failure anyway
@SuppressWarnings("ConstantConditions")
@EnableRuleMigrationSupport
public class HdfsWriterTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
private static final String SENSOR_NAME = "sensor";
private static final String WRITER_NAME = "writerName";
private File folder;
private FileNameFormat testFormat;
@BeforeAll
public static void beforeAll() throws Exception {
// See https://issues.apache.org/jira/browse/METRON-2036
// The need for this should go away when JUnit 4.13 is released and we can upgrade.
Thread.interrupted();
}
@BeforeEach
public void setup() throws IOException {
// Ensure each test has a unique folder to work with.
folder = tempFolder.newFolder();
testFormat = new DefaultFileNameFormat()
.withPath(folder.toString())
.withExtension(".json")
.withPrefix("prefix-");
}
@Test
public void testGetHdfsPathNull() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message);
writer.close();
assertEquals(SENSOR_NAME, result);
}
@Test
public void testGetHdfsPathEmptyString() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message);
writer.close();
assertEquals(SENSOR_NAME, result);
}
@Test
public void testGetHdfsPathConstant() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message);
writer.close();
assertEquals("new", result);
}
@Test
@SuppressWarnings("unchecked")
public void testGetHdfsPathDirectVariable() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "test.key", message);
writer.close();
assertEquals("test.value", result);
}
@Test
public void testGetHdfsPathFormatConstant() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('/test/folder/')", message);
writer.close();
assertEquals("/test/folder/", result);
}
@Test
@SuppressWarnings("unchecked")
public void testGetHdfsPathFormatVariable() {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
message.put("test.key.2", "test.value.2");
message.put("test.key.3", "test.value.3");
Object result = writer.getHdfsPathExtension(SENSOR_NAME,"FORMAT('%s/%s/%s', test.key, test.key.2, test.key.3)", message);
writer.close();
assertEquals("test.value/test.value.2/test.value.3", result);
}
@Test
public void testSetsCorrectHdfsFilename() {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
String filename = writer.fileNameFormat.getName(1,1);
assertEquals("prefix-Xcom-7-1-1.json", filename);
writer.close();
}
@Test
@SuppressWarnings("unchecked")
public void testGetHdfsPathMultipleFunctions() {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
message.put("test.key.2", "test.value.2");
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s', test.key)", message);
assertEquals("test.value", result);
result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s/%s', test.key, test.key.2)", message);
assertEquals("test.value/test.value.2", result);
result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s', test.key)", message);
writer.close();
assertEquals("test.value", result);
}
@Test
@SuppressWarnings("unchecked")
public void testGetHdfsPathStringReturned() {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "TO_UPPER(FORMAT(MAP_GET('key', {'key': 'AbC%s'}), test.key))", message);
writer.close();
assertEquals("ABCTEST.VALUE", result);
}
@Test
public void testGetHdfsPathNonString() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
try {
assertThrows(IllegalArgumentException.class, () -> writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message));
} finally {
writer.close();
}
}
@Test
public void testGetSourceHandlerOpenFilesMax() throws IOException {
int maxFiles = 2;
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
.withMaxOpenFiles(maxFiles);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
for(int i = 0; i < maxFiles; i++) {
writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
}
writer.close();
}
@Test
public void testGetSourceHandlerOpenFilesOverMax() throws IOException {
int maxFiles = 2;
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
.withMaxOpenFiles(maxFiles);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
for(int i = 0; i < maxFiles; i++) {
writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
}
// Should fail on max files + 1
try {
assertThrows(
IllegalStateException.class,
() -> writer.getSourceHandler(SENSOR_NAME, Integer.toString(maxFiles + 1), null));
} finally {
writer.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void testWriteNoOutputFunction() throws Exception {
FileNameFormat format = new DefaultFileNameFormat()
.withPath(folder.toString())
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
message.put("test.key2", "test.value2");
JSONObject message2 = new JSONObject();
message2.put("test.key", "test.value3");
message2.put("test.key2", "test.value2");
List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage("message1", message));
add(new BulkMessage("message2", message2));
}};
writer.write(SENSOR_NAME, config, messages);
writer.close();
ArrayList<String> expected = new ArrayList<>();
expected.add(message.toJSONString());
expected.add(message2.toJSONString());
Collections.sort(expected);
// Default to just putting it in the base folder + the sensor name
File outputFolder = new File(folder.getAbsolutePath() + "/" + SENSOR_NAME);
assertTrue(outputFolder.exists() && outputFolder.isDirectory());
assertEquals(1, outputFolder.listFiles().length);
for(File file : outputFolder.listFiles()) {
List<String> lines = Files.readAllLines(file.toPath());
Collections.sort(lines);
assertEquals(expected, lines);
}
}
@Test
@SuppressWarnings("unchecked")
public void testWriteSingleFile() throws Exception {
String function = "FORMAT('test-%s/%s', test.key, test.key)";
WriterConfiguration config = buildWriterConfiguration(function);
FileNameFormat format = new DefaultFileNameFormat()
.withPath(folder.toString())
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
// These two messages will be routed to the same folder, because test.key is the same
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
message.put("test.key2", "test.value2");
JSONObject message2 = new JSONObject();
message2.put("test.key", "test.value");
message2.put("test.key3", "test.value2");
List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage<>("message1", message));
add(new BulkMessage<>("message2", message2));
}};
writer.write(SENSOR_NAME, config, messages);
writer.close();
ArrayList<String> expected = new ArrayList<>();
expected.add(message.toJSONString());
expected.add(message2.toJSONString());
Collections.sort(expected);
File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/");
assertTrue(outputFolder.exists() && outputFolder.isDirectory());
assertEquals(1, outputFolder.listFiles().length);
for(File file : outputFolder.listFiles()) {
List<String> lines = Files.readAllLines(file.toPath());
Collections.sort(lines);
assertEquals(expected, lines);
}
}
@Test
@SuppressWarnings("unchecked")
public void testWriteMultipleFiles() throws Exception {
String function = "FORMAT('test-%s/%s', test.key, test.key)";
WriterConfiguration config = buildWriterConfiguration(function);
FileNameFormat format = new DefaultFileNameFormat()
.withPath(folder.toString())
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
// These two messages will be routed to the same folder, because test.key is the same
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
message.put("test.key2", "test.value2");
JSONObject message2 = new JSONObject();
message2.put("test.key", "test.value2");
message2.put("test.key3", "test.value3");
List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage("message1", message));
add(new BulkMessage("message2", message2));
}};
writer.write(SENSOR_NAME, config, messages);
writer.close();
ArrayList<String> expected1 = new ArrayList<>();
expected1.add(message.toJSONString());
Collections.sort(expected1);
File outputFolder1 = new File(folder.getAbsolutePath() + "/test-test.value/test.value/");
assertTrue(outputFolder1.exists() && outputFolder1.isDirectory());
assertEquals(1, outputFolder1.listFiles().length);
for(File file : outputFolder1.listFiles()) {
List<String> lines = Files.readAllLines(file.toPath());
Collections.sort(lines);
assertEquals(expected1, lines);
}
ArrayList<String> expected2 = new ArrayList<>();
expected2.add(message2.toJSONString());
Collections.sort(expected2);
File outputFolder2 = new File(folder.getAbsolutePath() + "/test-test.value2/test.value2/");
assertTrue(outputFolder2.exists() && outputFolder2.isDirectory());
assertEquals(1, outputFolder2.listFiles().length);
for(File file : outputFolder2.listFiles()) {
List<String> lines = Files.readAllLines(file.toPath());
Collections.sort(lines);
assertEquals(expected2, lines);
}
}
@Test
@SuppressWarnings("unchecked")
public void testWriteSingleFileWithNull() throws Exception {
String function = "FORMAT('test-%s/%s', test.key, test.key)";
WriterConfiguration config = buildWriterConfiguration(function);
FileNameFormat format = new DefaultFileNameFormat()
.withPath(folder.toString())
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
// These two messages will be routed to the same folder, because test.key is the same
JSONObject message = new JSONObject();
message.put("test.key2", "test.value2");
List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage("message1", message));
}};
writer.write(SENSOR_NAME, config,messages);
writer.close();
ArrayList<String> expected = new ArrayList<>();
expected.add(message.toJSONString());
Collections.sort(expected);
File outputFolder = new File(folder.getAbsolutePath() + "/test-null/null/");
assertTrue(outputFolder.exists() && outputFolder.isDirectory());
assertEquals(1, outputFolder.listFiles().length);
for(File file : outputFolder.listFiles()) {
List<String> lines = Files.readAllLines(file.toPath());
Collections.sort(lines);
assertEquals(expected, lines);
}
}
@Test
@SuppressWarnings("unchecked")
public void testSingleFileIfNoStreamClosed() throws Exception {
String function = "FORMAT('test-%s/%s', test.key, test.key)";
WriterConfiguration config = buildWriterConfiguration(function);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
writer.init(new HashMap<String, String>(), config);
writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
add(new BulkMessage("message1", message));
}};
CountSyncPolicy basePolicy = new CountSyncPolicy(5);
ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
writer.write(SENSOR_NAME, config, messages);
writer.write(SENSOR_NAME, config, messages);
writer.close();
File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/");
// The message should show up twice, once in each file
ArrayList<String> expected = new ArrayList<>();
expected.add(message.toJSONString());
expected.add(message.toJSONString());
// Assert both messages are in the same file, because the stream stayed open
assertEquals(1, outputFolder.listFiles().length);
for (File file : outputFolder.listFiles()) {
List<String> lines = Files.readAllLines(file.toPath());
// One line per file
assertEquals(2, lines.size());
assertEquals(expected, lines);
}
}
protected WriterConfiguration buildWriterConfiguration(String function) {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
Map<String, Object> sensorIndexingConfig = new HashMap<>();
Map<String, Object> writerIndexingConfig = new HashMap<>();
writerIndexingConfig.put(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, function);
sensorIndexingConfig.put(WRITER_NAME, writerIndexingConfig);
indexingConfig.updateSensorIndexingConfig(SENSOR_NAME, sensorIndexingConfig);
return new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
}
private TopologyContext createTopologyContext(){
Map<Integer, String> taskToComponent = new HashMap<>();
taskToComponent.put(7, "Xcom");
return new TopologyContext(null, null, taskToComponent, null, null, null, null, null, 7, 6703, null, null, null, null, null, null);
}
}