| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.base.source.reader; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.api.connector.source.SourceSplit; |
| |
| import javax.annotation.Nullable; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** An implementation of RecordsWithSplitIds to host all the records by splits. */ |
| @PublicEvolving |
| public class RecordsBySplits<E> implements RecordsWithSplitIds<E> { |
| |
| private final Set<String> finishedSplits; |
| |
| private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator; |
| |
| @Nullable private Iterator<E> recordsInCurrentSplit; |
| |
| public RecordsBySplits( |
| final Map<String, Collection<E>> recordsBySplit, final Set<String> finishedSplits) { |
| |
| this.splitsIterator = checkNotNull(recordsBySplit, "recordsBySplit").entrySet().iterator(); |
| this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits"); |
| } |
| |
| @Nullable |
| @Override |
| public String nextSplit() { |
| if (splitsIterator.hasNext()) { |
| final Map.Entry<String, Collection<E>> next = splitsIterator.next(); |
| recordsInCurrentSplit = next.getValue().iterator(); |
| return next.getKey(); |
| } else { |
| return null; |
| } |
| } |
| |
| @Nullable |
| @Override |
| public E nextRecordFromSplit() { |
| if (recordsInCurrentSplit == null) { |
| throw new IllegalStateException(); |
| } |
| |
| return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next() : null; |
| } |
| |
| @Override |
| public Set<String> finishedSplits() { |
| return finishedSplits; |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * A utility builder to collect records in individual calls, rather than put a finished |
| * collection in the {@link RecordsBySplits#RecordsBySplits(Map, Set)} constructor. |
| */ |
| public static class Builder<E> { |
| |
| private final Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>(); |
| private final Set<String> finishedSplits = new HashSet<>(2); |
| |
| /** |
| * Add the record from the given split ID. |
| * |
| * @param splitId the split ID the record was from. |
| * @param record the record to add. |
| */ |
| public void add(String splitId, E record) { |
| recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record); |
| } |
| |
| /** |
| * Add the record from the given source split. |
| * |
| * @param split the source split the record was from. |
| * @param record the record to add. |
| */ |
| public void add(SourceSplit split, E record) { |
| add(split.splitId(), record); |
| } |
| |
| /** |
| * Add multiple records from the given split ID. |
| * |
| * @param splitId the split ID given the records were from. |
| * @param records the records to add. |
| */ |
| public void addAll(String splitId, Collection<E> records) { |
| this.recordsBySplits.compute( |
| splitId, |
| (id, r) -> { |
| if (r == null) { |
| r = records; |
| } else { |
| r.addAll(records); |
| } |
| return r; |
| }); |
| } |
| |
| /** |
| * Add multiple records from the given source split. |
| * |
| * @param split the source split the records were from. |
| * @param records the records to add. |
| */ |
| public void addAll(SourceSplit split, Collection<E> records) { |
| addAll(split.splitId(), records); |
| } |
| |
| /** |
| * Mark the split with the given ID as finished. |
| * |
| * @param splitId the ID of the finished split. |
| */ |
| public void addFinishedSplit(String splitId) { |
| finishedSplits.add(splitId); |
| } |
| |
| /** |
| * Mark multiple splits with the given IDs as finished. |
| * |
| * @param splitIds the IDs of the finished splits. |
| */ |
| public void addFinishedSplits(Collection<String> splitIds) { |
| finishedSplits.addAll(splitIds); |
| } |
| |
| public RecordsBySplits<E> build() { |
| return new RecordsBySplits<>( |
| recordsBySplits.isEmpty() ? Collections.emptyMap() : recordsBySplits, |
| finishedSplits.isEmpty() ? Collections.emptySet() : finishedSplits); |
| } |
| } |
| } |