blob: 420f5ac2e45c422f725b1091e271f4b474c05828 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.file;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.errors.ConnectException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.connect.file.FileConfig.FILE_CONFIG;
import static org.apache.rocketmq.connect.file.FileConstants.LINE;
import static org.apache.rocketmq.connect.file.FileConstants.NEXT_POSITION;
public class FileSourceTask extends SourceTask {
private Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
private FileConfig fileConfig;
private InputStream stream;
private BufferedReader reader = null;
private char[] buffer = new char[1024];
private int offset = 0;
private int batchSize = FileSourceConnector.DEFAULT_TASK_BATCH_SIZE;
private Long streamOffset;
private KeyValue config;
@Override public List<ConnectRecord> poll() {
log.info("Start a poll stream is null:{}", stream == null);
if (stream == null) {
try {
stream = Files.newInputStream(Paths.get(fileConfig.getFilename()));
RecordOffset positionInfo = this.sourceTaskContext.offsetStorageReader().readOffset(offsetKey(FileConstants.getPartition(fileConfig.getFilename())));
if (positionInfo != null && null != positionInfo.getOffset()) {
log.info("positionInfo is not null!");
Map<String, ?> offset = (Map<String, String>) positionInfo.getOffset();
Object lastRecordedOffset = offset.get(NEXT_POSITION);
if (lastRecordedOffset != null) {
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
long skipLeft = Long.valueOf(String.valueOf(lastRecordedOffset));
while (skipLeft > 0) {
try {
long skipped = stream.skip(skipLeft);
skipLeft -= skipped;
} catch (IOException e) {
log.error("Error while trying to seek to previous offset in file {}: ", fileConfig.getFilename(), e);
throw new ConnectException(e);
}
}
log.debug("Skipped to offset {}", lastRecordedOffset);
}
streamOffset = (lastRecordedOffset != null) ? Long.valueOf(String.valueOf(lastRecordedOffset)) : 0L;
} else {
log.info("positionInfo is null!");
streamOffset = 0L;
}
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
log.debug("Opened {} for reading", logFilename());
} catch (NoSuchFileException e) {
log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
synchronized (this) {
try {
this.wait(1000);
} catch (InterruptedException e1) {
log.error("Interrupt error .", e1);
}
}
return null;
} catch (IOException e) {
log.error("Error while trying to open file {}: ", fileConfig.getFilename(), e);
throw new ConnectException(e);
}
}
try {
final BufferedReader readerCopy;
synchronized (this) {
readerCopy = reader;
}
if (readerCopy == null) {
return null;
}
List<ConnectRecord> records = null;
int nread = 0;
while (readerCopy.ready()) {
nread = readerCopy.read(buffer, offset, buffer.length - offset);
log.trace("Read {} bytes from {}", nread, logFilename());
if (nread > 0) {
offset += nread;
if (offset == buffer.length) {
char[] newbuf = new char[buffer.length * 2];
System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
buffer = newbuf;
}
String line;
do {
line = extractLine();
if (line != null) {
log.trace("Read a line from {}", logFilename());
if (records == null) {
records = new ArrayList<>();
}
List<Field> fields = new ArrayList<Field>();
Schema schema = new Schema(fileConfig.getFilename() + LINE, FieldType.STRING, fields);
final Field field = new Field(0, FileConstants.FILE_LINE_CONTENT, SchemaBuilder.string().name(fileConfig.getFilename() + LINE).build());
fields.add(field);
schema.setFields(fields);
ConnectRecord connectRecord = new ConnectRecord(offsetKey(fileConfig.getFilename()), offsetValue(streamOffset), System.currentTimeMillis(), schema, line);
connectRecord.addExtension("topic", fileConfig.getTopic());
records.add(connectRecord);
if (records.size() >= batchSize) {
return records;
}
}
}
while (line != null);
}
}
if (nread <= 0) {
synchronized (this) {
this.wait(1000);
}
}
return records;
} catch (IOException e) {
} catch (InterruptedException e) {
log.error("Interrupt error .", e);
}
return null;
}
private String extractLine() {
int until = -1, newStart = -1;
for (int i = 0; i < offset; i++) {
if (buffer[i] == '\n') {
until = i;
newStart = i + 1;
break;
} else if (buffer[i] == '\r') {
// We need to check for \r\n, so we must skip this if we can't check the next char
if (i + 1 >= offset)
return null;
until = i;
newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
break;
}
}
if (until != -1) {
String result = new String(buffer, 0, until);
System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
offset = offset - newStart;
if (streamOffset != null)
streamOffset += newStart;
return result;
} else {
return null;
}
}
@Override public void validate(KeyValue config) { }
@Override public void start(KeyValue config) {
this.config = config;
fileConfig = new FileConfig();
fileConfig.load(config);
log.info("fileName is:{}", fileConfig.getFilename());
if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) {
stream = System.in;
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
}
}
@Override public void stop() {
log.trace("Stopping");
synchronized (this) {
try {
if (stream != null && stream != System.in) {
stream.close();
log.trace("Closed input stream");
}
} catch (IOException e) {
log.error("Failed to close FileStreamSourceTask stream: ", e);
}
this.notify();
}
}
private String logFilename() {
return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename();
}
private RecordPartition offsetKey(String filename) {
Map<String, String> map = new HashMap<>();
map.put(FILE_CONFIG, filename);
RecordPartition recordPartition = new RecordPartition(map);
return recordPartition;
}
private RecordOffset offsetValue(Long pos) {
Map<String, String> map = new HashMap<>();
map.put(FileConstants.NEXT_POSITION, String.valueOf(pos));
RecordOffset recordOffset = new RecordOffset(map);
return recordOffset;
}
}