| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| package org.apache.edgent.connectors.file; |
| |
| import java.io.File; |
| import java.nio.file.WatchService; |
| import java.util.Comparator; |
| |
| import org.apache.edgent.connectors.file.runtime.DirectoryWatcher; |
| import org.apache.edgent.connectors.file.runtime.IFileWriterPolicy; |
| import org.apache.edgent.connectors.file.runtime.TextFileReader; |
| import org.apache.edgent.connectors.file.runtime.TextFileWriter; |
| import org.apache.edgent.function.BiFunction; |
| import org.apache.edgent.function.Function; |
| import org.apache.edgent.function.Supplier; |
| import org.apache.edgent.topology.TSink; |
| import org.apache.edgent.topology.TStream; |
| import org.apache.edgent.topology.TopologyElement; |
| |
| /** |
| * {@code FileStreams} is a connector for integrating with file system objects. |
| * <p> |
| * File stream operations include: |
| * <ul> |
| * <li>Write tuples to text files - {@link #textFileWriter(TStream, Supplier, Supplier) textFileWriter}</li> |
| * <li>Watch a directory for new files - {@link #directoryWatcher(TopologyElement, Supplier) directoryWatcher}</li> |
| * <li>Create tuples from text files - {@link #textFileReader(TStream, Function, BiFunction) textFileReader}</li> |
| * </ul> |
| */ |
| public class FileStreams { |
| @SuppressWarnings("unused") |
| private static final FileStreams forCodeCoverage = new FileStreams(); |
| private FileStreams() {}; |
| |
| /** |
| * Declare a stream containing the absolute pathname of |
| * newly created file names from watching {@code directory}. |
| * <p> |
| * This is the same as {@code directoryWatcher(t, () -> dir, null)}. |
| * |
| * @param te org.apache.edgent.org.apache.edgent.topology element whose org.apache.edgent.org.apache.edgent.topology the watcher will be added to |
| * @param directory |
| * Name of the directory to watch. |
| * @return Stream containing absolute pathnames of newly created files in |
| * {@code directory}. |
| */ |
| public static TStream<String> directoryWatcher(TopologyElement te, |
| Supplier<String> directory) { |
| return directoryWatcher(te, directory, null); |
| } |
| |
| /** |
| * Declare a stream containing the absolute pathname of |
| * newly created file names from watching {@code directory}. |
| * <p> |
| * Hidden files (java.io.File.isHidden()==true) are ignored. |
| * This is compatible with {@code textFileWriter}. |
| * <p> |
| * Sample use: |
| * <pre>{@code |
| * String dir = "/some/directory/path"; |
| * Topology t = ... |
| * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir, null); |
| * }</pre> |
| * <p> |
| * The order of the files in the stream is dictated by a {@link Comparator}. |
| * The default comparator orders files by {@link File#lastModified()} values. |
| * There are no guarantees on the processing order of files that |
| * have the same lastModified value. |
| * Note, lastModified values are subject to filesystem timestamp |
| * quantization - e.g., 1second. |
| * <p> |
| * Note: due to the asynchronous nature of things, if files in the |
| * directory may be removed, the receiver of a tuple with a "new" file |
| * pathname may need to be prepared for the pathname to no longer be |
| * valid when it receives the tuple or during its processing of the tuple. |
| * <p> |
| * The behavior on MacOS may be unsavory, even as recent as Java8, as |
| * MacOs Java lacks a native implementation of {@link WatchService}. |
| * The result can be a delay in detecting newly created files (e.g., 10sec) |
| * as well not detecting rapid deletion and recreation of a file. |
| * |
| * @param te org.apache.edgent.org.apache.edgent.topology element whose org.apache.edgent.org.apache.edgent.topology the watcher will be added to |
| * @param directory |
| * Name of the directory to watch. |
| * @param comparator |
| * Comparator to use to order newly seen file pathnames. |
| * May be null. |
| * @return Stream containing absolute pathnames of newly created files in |
| * {@code directory}. |
| */ |
| public static TStream<String> directoryWatcher(TopologyElement te, |
| Supplier<String> directory, Comparator<File> comparator) { |
| return te.topology().source(() -> new DirectoryWatcher(directory, comparator)); |
| } |
| |
| /** |
| * Declare a stream containing the lines read from the files |
| * whose pathnames correspond to each tuple on the {@code pathnames} |
| * stream. |
| * <p> |
| * This is the same as {@code textFileReader(pathnames, null, null)} |
| * <p> |
| * Sample uses: |
| * |
| * <pre>{@code |
| * // continuously watch a directory for new files and process each one once |
| * String dir = "/some/directory/path"; |
| * Topology t = ... |
| * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir); |
| * TStream<String> contents = FileStreams.textFileReader(pathnames); |
| * contents.print(); |
| * }</pre> |
| * |
| * <pre>{@code |
| * // periodically process one or more files |
| * Supplier<List<String>> myPathnamesSupplier = () -> } { |
| * {@code |
| * // implementation of List<String> get() - return a list of pathnames |
| * // return Arrays.asList("/some/pathname"); // a fixed list |
| * // return Arrays.asList(new File("/some/dir").list()); // query a directory |
| * // or query some other object(s) for a list ... |
| * } |
| * }; |
| * {@code |
| * Topology t = ... |
| * TStream<String> pathnames = t.poll(myPathnamesSupplier, 30, TimeUnit.SECONDS) |
| * .flatMap(tuple -> tuple); |
| * TStream<String> contents = FileStreams.textFileReader(pathnames); |
| * contents.print(); |
| * }</pre> |
| * |
| * @param pathnames |
| * Stream containing pathnames of files to read. |
| * @return Stream containing lines from the files. |
| */ |
| public static TStream<String> textFileReader(TStream<String> pathnames) { |
| return textFileReader(pathnames, null, null); |
| } |
| |
| /** |
| * Declare a stream containing the lines read from the files |
| * whose pathnames correspond to each tuple on the {@code pathnames} |
| * stream. |
| * <p> |
| * All files are assumed to be encoded in UTF-8. The lines are |
| * output in the order they appear in each file, with the first line of |
| * a file appearing first. A file is not subsequently monitored for |
| * additional lines. |
| * <p> |
| * If a file can not be read, e.g., a file doesn't exist at that pathname |
| * or the pathname is for a directory, |
| * an error will be logged. |
| * <p> |
| * Optional {@code preFn} and {@code postFn} functions may be supplied. |
| * These are called prior to processing a tuple (pathname) and after |
| * respectively. They provide a way to encode markers in the generated |
| * stream. |
| * <p> |
| * Sample use: |
| * <pre>{@code |
| * // watch a directory for files, creating a stream with the contents of |
| * // each file. Use a preFn to include a file separator marker in the |
| * // stream. Use a postFn to delete a file once it's been processed. |
| * String dir = "/some/directory/path"; |
| * Topology t = ... |
| * TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> dir); |
| * TStream<String> contents = FileStreams.textFileReader( |
| * pathnames, |
| * path -> { return "###<PATH-MARKER>### " + path }, |
| * (path,exception) -> { new File(path).delete(), return null; } |
| * ); |
| * contents.print(); |
| * }</pre> |
| * |
| * @param pathnames |
| * Stream containing pathnames of files to read. |
| * @param preFn |
| * Pre-visit {@code Function<String,String>}. |
| * The input is the pathname. |
| * The result, when non-null, is added to the output stream. |
| * The function may be null. |
| * @param postFn |
| * Post-visit {@code BiFunction<String,Exception,String>}. |
| * The input is the pathname and an exception. The exception |
| * is null if there were no errors. |
| * The result, when non-null, is added to the output stream. |
| * The function may be null. |
| * @return Stream containing lines from the files. |
| */ |
| public static TStream<String> textFileReader(TStream<String> pathnames, |
| Function<String,String> preFn, BiFunction<String,Exception,String> postFn) { |
| |
| TextFileReader reader = new TextFileReader(); |
| reader.setPre(preFn); |
| reader.setPost(postFn); |
| return pathnames.pipe(reader); |
| } |
| |
| /** |
| * Write the contents of a stream to files. |
| * <p> |
| * The default {@link FileWriterPolicy} is used. |
| * <p> |
| * This is the same as {@code textFileWriter(contents, basePathname, null)}. |
| * <p> |
| * Sample use: |
| * <pre>{@code |
| * // write a stream of LogEvent to files, using the default |
| * // file writer policy |
| * String basePathname = "/myLogDir/LOG"; // yield LOG_YYYYMMDD_HHMMSS |
| * TStream<MyLogEvent> events = ... |
| * TStream<String> stringEvents = events.map(event -> event.toString()); |
| * FileStreams.textFileWriter(stringEvents, () -> basePathname); |
| * }</pre> |
| * @param contents the lines to write |
| * @param basePathname the base pathname of the created files |
| * @return a TSink |
| */ |
| public static TSink<String> textFileWriter(TStream<String> contents, |
| Supplier<String> basePathname) { |
| return textFileWriter(contents, basePathname, null); |
| } |
| |
| /** |
| * Write the contents of a stream to files subject to the control |
| * of a file writer policy. |
| * <p> |
| * A separate policy instance must be used for invocation. |
| * A default {@link FileWriterPolicy} is used if a policy is not specified. |
| * <p> |
| * Sample use: |
| * <pre>{@code |
| * // write a stream of LogEvent to files using a policy of: |
| * // no additional flush, 100 events per file, retain 5 files |
| * IFileWriterPolicy<String> policy = new FileWriterPolicy<String>( |
| * FileWriterFlushConfig.newImplicitConfig(), |
| * FileWriterCycleConfig.newCountBasedConfig(100), |
| * FileWriterRetentionConfig.newFileCountBasedConfig(5) |
| * ); |
| * String basePathname = "/myLogDir/LOG"; // yield LOG_YYYYMMDD_HHMMSS |
| * TStream<MyLogEvent> events = ... |
| * TStream<String> stringEvents = events.map(event -> event.toString()); |
| * FileStreams.textFileWriter(stringEvents, () -> basePathname, () -> policy); |
| * }</pre> |
| * @param contents the lines to write |
| * @param basePathname the base pathname of the created files |
| * @param policy the policy to use. may be null. |
| * @return a TSink |
| * @see FileWriterPolicy |
| */ |
| public static TSink<String> textFileWriter(TStream<String> contents, |
| Supplier<String> basePathname, Supplier<IFileWriterPolicy<String>> policy) { |
| if (policy == null) { |
| IFileWriterPolicy<String> defaultPolicy = new FileWriterPolicy<String>(){}; |
| policy = () -> defaultPolicy; |
| } |
| return contents.sink(new TextFileWriter(basePathname, policy)); |
| } |
| } |