blob: eba4fb411f17c52ec82da7d5711a9664ac00e657 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.sink.file;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.TestCommon;
import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSinkFileITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkFileITCase.class);
private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath();
private static final String FILENAME = "test.txt";
private final int expect = 1;
@BeforeEach
public void setUp() {
cleanup();
}
@AfterEach
public void tearDown() {
cleanup();
}
private void cleanup() {
File doneFile = new File(SINK_DIR, FILENAME + ".done");
if (doneFile.exists()) {
doneFile.delete();
}
}
private void putRecords() {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
for (int i = 0; i < expect; i++) {
try {
kafkaClient.produce(TestCommon.getDefaultTestTopic(this.getClass()), "test");
} catch (ExecutionException e) {
LOG.error("Unable to produce messages: {}", e.getMessage(), e);
} catch (InterruptedException e) {
break;
}
}
}
public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
putRecords();
LOG.debug("Created the consumer ... About to receive messages");
File sinkFile = new File(SINK_DIR, FILENAME);
File doneFile = new File(SINK_DIR, FILENAME + ".done");
waitForFile(sinkFile, doneFile);
assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath()));
checkFileContents(sinkFile);
}
@Test
@Timeout(90)
public void testBasicSendReceive() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
.withTopics(TestCommon.getDefaultTestTopic(this.getClass()))
.withDirectoryName(SINK_DIR)
.withFileName(FILENAME)
.withDoneFileName("${file:name}.done");
runTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("HTTP test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
@Test
@Timeout(90)
public void testBasicSendReceiveUsingUrl() {
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
.withTopics(TestCommon.getDefaultTestTopic(this.getClass()))
.withUrl(SINK_DIR)
.append("fileName", FILENAME)
.append("doneFileName", "${file:name}.done")
.buildUrl();
runTest(connectorPropertyFactory);
} catch (Exception e) {
LOG.error("HTTP test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
}
private void checkFileContents(File sinkFile) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(sinkFile));
int i = 0;
String line;
do {
line = reader.readLine();
if (line != null) {
assertEquals("test", line, String.format("Unexpected data: %s", line));
i++;
}
} while (line != null);
assertEquals(expect, i, "Did not receive the same amount of messages that were sent");
}
private void waitForFile(File sinkFile, File doneFile) throws IOException, InterruptedException {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = sinkFile.getParentFile().toPath();
if (doneFile.exists()) {
return;
}
// We watch for both the file creation and truncation
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY);
int retries = 30;
do {
WatchKey watchKey = watchService.poll(1, TimeUnit.SECONDS);
if (watchKey == null) {
continue;
}
for (WatchEvent<?> event : watchKey.pollEvents()) {
/*
It should return a Path object for ENTRY_CREATE and ENTRY_MODIFY events
*/
Object context = event.context();
if (!(context instanceof Path)) {
LOG.warn("Received an unexpected event of kind {} for context {}", event.kind(), event.context());
continue;
}
Path contextPath = (Path) context;
if (contextPath.toString().equals(doneFile.getName())) {
LOG.info("Sink file at the build path {} had a matching event of type: {}", sinkFile.getPath(),
event.kind());
return;
} else {
LOG.debug("Ignoring a watch event at build path {} of type {} for file: {}", sinkFile.getPath(),
event.kind(), contextPath.getFileName());
}
}
watchKey.reset();
retries--;
} while (!doneFile.exists() && retries > 0);
}
}