blob: 951c5277e0991860a9eb4c4f0ba77a10f6b47850 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.mongodb.source.enumerator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
/** The enumerator class for {@link MongoSource}. */
@Internal
public class MongoSourceEnumerator
implements SplitEnumerator<MongoSourceSplit, MongoSourceEnumState> {
private static final Logger LOG = LoggerFactory.getLogger(MongoSourceEnumerator.class);
private final Boundedness boundedness;
private final SplitEnumeratorContext<MongoSourceSplit> context;
private final MongoSplitAssigner splitAssigner;
private final TreeSet<Integer> readersAwaitingSplit;
public MongoSourceEnumerator(
Boundedness boundedness,
SplitEnumeratorContext<MongoSourceSplit> context,
MongoSplitAssigner splitAssigner) {
this.boundedness = boundedness;
this.context = context;
this.splitAssigner = splitAssigner;
this.readersAwaitingSplit = new TreeSet<>();
}
@Override
public void start() {
splitAssigner.open();
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
readersAwaitingSplit.add(subtaskId);
assignSplits();
}
@Override
public void addSplitsBack(List<MongoSourceSplit> splits, int subtaskId) {
LOG.debug("Mongo Source Enumerator adds splits back: {}", splits);
splitAssigner.addSplitsBack(splits);
}
@Override
public void addReader(int subtaskId) {
LOG.debug("Adding reader {} to MongoSourceEnumerator.", subtaskId);
}
private void assignSplits() {
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
while (awaitingReader.hasNext()) {
int nextAwaiting = awaitingReader.next();
// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(nextAwaiting)) {
awaitingReader.remove();
continue;
}
// close idle readers
if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) {
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
LOG.info(
"All scan splits have been assigned, closing idle reader {}", nextAwaiting);
continue;
}
Optional<MongoSourceSplit> split = splitAssigner.getNext();
if (split.isPresent()) {
final MongoSourceSplit mongoSplit = split.get();
context.assignSplit(mongoSplit, nextAwaiting);
awaitingReader.remove();
LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting);
break;
} else {
// there is no available splits by now, skip assigning
break;
}
}
}
@Override
public MongoSourceEnumState snapshotState(long checkpointId) {
return splitAssigner.snapshotState(checkpointId);
}
@Override
public void close() throws IOException {
splitAssigner.close();
}
}