blob: 02d1e024a0d431645de719eb416f1191b1d1299e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.file.source;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
import java.util.List;
public abstract class BaseFileSource implements SeaTunnelSource<SeaTunnelRow, FileSourceSplit, FileSourceState> {
protected SeaTunnelRowType rowType;
protected ReadStrategy readStrategy;
protected HadoopConf hadoopConf;
protected List<String> filePaths;
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return rowType;
}
@Override
public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
return new BaseFileSourceReader(readStrategy, hadoopConf, readerContext);
}
@Override
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) throws Exception {
return new FileSourceSplitEnumerator(enumeratorContext, filePaths);
}
@Override
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext, FileSourceState checkpointState) throws Exception {
return new FileSourceSplitEnumerator(enumeratorContext, filePaths, checkpointState);
}
}