| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.translation.spark.common.source.micro; |
| |
| import org.apache.seatunnel.api.source.Collector; |
| import org.apache.seatunnel.api.source.SeaTunnelSource; |
| import org.apache.seatunnel.api.source.SourceReader; |
| import org.apache.seatunnel.api.source.SourceSplit; |
| import org.apache.seatunnel.api.table.type.SeaTunnelRow; |
| import org.apache.seatunnel.translation.source.BaseSourceFunction; |
| import org.apache.seatunnel.translation.source.CoordinatedSource; |
| import org.apache.seatunnel.translation.spark.common.InternalRowCollector; |
| import org.apache.seatunnel.translation.spark.common.ReaderState; |
| |
| import java.io.Serializable; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| public class CoordinatedMicroBatchPartitionReader extends ParallelMicroBatchPartitionReader { |
| protected final Map<Integer, InternalRowCollector> collectorMap; |
| |
| public CoordinatedMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source, |
| Integer parallelism, |
| Integer subtaskId, |
| Integer checkpointId, |
| Integer checkpointInterval, |
| String checkpointPath, |
| String hdfsRoot, |
| String hdfsUser) { |
| super(source, parallelism, subtaskId, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser); |
| this.collectorMap = new HashMap<>(parallelism); |
| for (int i = 0; i < parallelism; i++) { |
| collectorMap.put(i, new InternalRowCollector(handover, new Object(), source.getProducedType())); |
| } |
| } |
| |
| @Override |
| public void virtualCheckpoint() { |
| try { |
| internalCheckpoint(collectorMap.values().iterator(), 0); |
| } catch (Exception e) { |
| throw new RuntimeException("An error occurred in virtual checkpoint execution.", e); |
| } |
| } |
| |
| private void internalCheckpoint(Iterator<InternalRowCollector> iterator, int loop) throws Exception { |
| if (!iterator.hasNext()) { |
| return; |
| } |
| synchronized (iterator.next().getCheckpointLock()) { |
| internalCheckpoint(iterator, ++loop); |
| if (loop != this.parallelism) { |
| // Avoid backtracking calls |
| return; |
| } |
| while (!handover.isEmpty()) { |
| Thread.sleep(CHECKPOINT_SLEEP_INTERVAL); |
| } |
| // Block #next() method |
| synchronized (handover) { |
| final int currentCheckpoint = checkpointId; |
| ReaderState readerState = snapshotState(); |
| saveState(readerState, currentCheckpoint); |
| internalSource.notifyCheckpointComplete(currentCheckpoint); |
| running = false; |
| } |
| } |
| } |
| |
| @Override |
| protected String getEnumeratorThreadName() { |
| return "coordinated-split-enumerator-executor"; |
| } |
| |
| @Override |
| protected BaseSourceFunction<SeaTunnelRow> createInternalSource() { |
| return new InternalCoordinatedSource<>(source, |
| null, |
| parallelism); |
| } |
| |
| public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT extends Serializable> extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> { |
| |
| public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) { |
| super(source, restoredState, parallelism); |
| } |
| |
| @Override |
| public void run(Collector<SeaTunnelRow> collector) throws Exception { |
| readerMap.entrySet().parallelStream().forEach(entry -> { |
| final AtomicBoolean flag = readerRunningMap.get(entry.getKey()); |
| final SourceReader<SeaTunnelRow, SplitT> reader = entry.getValue(); |
| final Collector<SeaTunnelRow> rowCollector = collectorMap.get(entry.getKey()); |
| executorService.execute(() -> { |
| while (flag.get()) { |
| try { |
| reader.pollNext(rowCollector); |
| Thread.sleep(SLEEP_TIME_INTERVAL); |
| } catch (Exception e) { |
| this.running = false; |
| flag.set(false); |
| throw new RuntimeException(e); |
| } |
| } |
| }); |
| }); |
| splitEnumerator.run(); |
| while (this.running) { |
| Thread.sleep(SLEEP_TIME_INTERVAL); |
| } |
| } |
| |
| @Override |
| protected void handleNoMoreElement(int subtaskId) { |
| super.handleNoMoreElement(subtaskId); |
| if (!this.running) { |
| CoordinatedMicroBatchPartitionReader.this.running = false; |
| } |
| } |
| } |
| } |