blob: 84e33e529dadf24f4d2856025c06405bce3f1a2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.source;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.source.split.ValuesSourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
/** A {@link SourceReader} implementation that reads data from a list. */
public class ValuesSourceReader implements SourceReader<RowData, ValuesSourceSplit> {
private static final Logger LOG = LoggerFactory.getLogger(ValuesSourceReader.class);
/** The context for this reader, to communicate with the enumerator. */
private final SourceReaderContext context;
/** The availability future. This reader is available as soon as a split is assigned. */
private CompletableFuture<Void> availability;
private final List<byte[]> serializedElements;
private final TypeSerializer<RowData> serializer;
private List<RowData> elements;
/** The remaining splits that were assigned but not yet processed. */
private final Queue<ValuesSourceSplit> remainingSplits;
private boolean noMoreSplits;
public ValuesSourceReader(
List<byte[]> serializedElements,
TypeSerializer<RowData> serializer,
SourceReaderContext context) {
this.serializedElements = serializedElements;
this.serializer = serializer;
this.context = context;
this.availability = new CompletableFuture<>();
this.remainingSplits = new ArrayDeque<>();
}
@Override
public void start() {
elements = new ArrayList<>();
for (byte[] bytes : serializedElements) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
DataInputView input = new DataInputViewStreamWrapper(bais);
RowData element = serializer.deserialize(input);
elements.add(element);
} catch (Exception e) {
throw new TableException(
"Failed to deserialize an element from the source. "
+ "If you are using user-defined serialization (Value and Writable types), check the "
+ "serialization functions.\nSerializer is "
+ serializer,
e);
}
}
// request a split if we don't have one
if (remainingSplits.isEmpty()) {
context.sendSplitRequest();
}
}
@Override
public InputStatus pollNext(ReaderOutput<RowData> output) throws Exception {
ValuesSourceSplit currentSplit = remainingSplits.poll();
if (currentSplit != null) {
output.collect(elements.get(currentSplit.getIndex()));
// request another split
context.sendSplitRequest();
return InputStatus.MORE_AVAILABLE;
} else if (noMoreSplits) {
return InputStatus.END_OF_INPUT;
} else {
// ensure we are not called in a loop by resetting the availability future
if (availability.isDone()) {
availability = new CompletableFuture<>();
}
return InputStatus.NOTHING_AVAILABLE;
}
}
@Override
public List<ValuesSourceSplit> snapshotState(long checkpointId) {
return Collections.emptyList();
}
@Override
public CompletableFuture<Void> isAvailable() {
return availability;
}
@Override
public void addSplits(List<ValuesSourceSplit> splits) {
remainingSplits.addAll(splits);
// set availability so that pollNext is actually called
availability.complete(null);
}
@Override
public void notifyNoMoreSplits() {
noMoreSplits = true;
// set availability so that pollNext is actually called
availability.complete(null);
}
@Override
public void close() throws Exception {}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.info("checkpoint {} finished.", checkpointId);
}
}