blob: 83ca10ccafef00bf49586d3126f92cabc4a2e941 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.translation.spark.common.source.micro;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.CoordinatedSource;
import org.apache.seatunnel.translation.spark.common.InternalRowCollector;
import org.apache.seatunnel.translation.spark.common.ReaderState;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class CoordinatedMicroBatchPartitionReader extends ParallelMicroBatchPartitionReader {
protected final Map<Integer, InternalRowCollector> collectorMap;
public CoordinatedMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism,
Integer subtaskId,
Integer checkpointId,
Integer checkpointInterval,
String checkpointPath,
String hdfsRoot,
String hdfsUser) {
super(source, parallelism, subtaskId, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
this.collectorMap = new HashMap<>(parallelism);
for (int i = 0; i < parallelism; i++) {
collectorMap.put(i, new InternalRowCollector(handover, new Object(), source.getProducedType()));
}
}
@Override
public void virtualCheckpoint() {
try {
internalCheckpoint(collectorMap.values().iterator(), 0);
} catch (Exception e) {
throw new RuntimeException("An error occurred in virtual checkpoint execution.", e);
}
}
private void internalCheckpoint(Iterator<InternalRowCollector> iterator, int loop) throws Exception {
if (!iterator.hasNext()) {
return;
}
synchronized (iterator.next().getCheckpointLock()) {
internalCheckpoint(iterator, ++loop);
if (loop != this.parallelism) {
// Avoid backtracking calls
return;
}
while (!handover.isEmpty()) {
Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
}
// Block #next() method
synchronized (handover) {
final int currentCheckpoint = checkpointId;
ReaderState readerState = snapshotState();
saveState(readerState, currentCheckpoint);
internalSource.notifyCheckpointComplete(currentCheckpoint);
running = false;
}
}
}
@Override
protected String getEnumeratorThreadName() {
return "coordinated-split-enumerator-executor";
}
@Override
protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
return new InternalCoordinatedSource<>(source,
null,
parallelism);
}
public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT extends Serializable> extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
super(source, restoredState, parallelism);
}
@Override
public void run(Collector<SeaTunnelRow> collector) throws Exception {
readerMap.entrySet().parallelStream().forEach(entry -> {
final AtomicBoolean flag = readerRunningMap.get(entry.getKey());
final SourceReader<SeaTunnelRow, SplitT> reader = entry.getValue();
final Collector<SeaTunnelRow> rowCollector = collectorMap.get(entry.getKey());
executorService.execute(() -> {
while (flag.get()) {
try {
reader.pollNext(rowCollector);
Thread.sleep(SLEEP_TIME_INTERVAL);
} catch (Exception e) {
this.running = false;
flag.set(false);
throw new RuntimeException(e);
}
}
});
});
splitEnumerator.run();
while (this.running) {
Thread.sleep(SLEEP_TIME_INTERVAL);
}
}
@Override
protected void handleNoMoreElement(int subtaskId) {
super.handleNoMoreElement(subtaskId);
if (!this.running) {
CoordinatedMicroBatchPartitionReader.this.running = false;
}
}
}
}