blob: 30c5c34029ec45ea0f70b6f70acc3ac50ce0713a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.translation.spark.common.source.micro;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.spark.common.ReaderState;
import org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ParallelMicroBatchPartitionReader extends ParallelBatchPartitionReader {
protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
protected volatile Integer checkpointId;
protected final Integer checkpointInterval;
protected final String checkpointPath;
protected final String hdfsRoot;
protected final String hdfsUser;
protected Map<Integer, List<byte[]>> restoredState;
protected ScheduledThreadPoolExecutor executor;
protected FileSystem fileSystem;
public ParallelMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism,
Integer subtaskId,
Integer checkpointId,
Integer checkpointInterval,
String checkpointPath,
String hdfsRoot,
String hdfsUser) {
super(source, parallelism, subtaskId);
this.checkpointId = checkpointId;
this.checkpointInterval = checkpointInterval;
this.checkpointPath = checkpointPath;
this.hdfsRoot = hdfsRoot;
this.hdfsUser = hdfsUser;
}
@Override
protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
return new InternalParallelSource<>(source,
restoredState,
parallelism,
subtaskId);
}
@Override
protected void prepare() {
try {
this.fileSystem = getFileSystem();
this.restoredState = restoreState(checkpointId - 1);
} catch (Exception e) {
throw new RuntimeException(e);
}
super.prepare();
prepareCheckpoint();
}
protected FileSystem getFileSystem() throws URISyntaxException, IOException, InterruptedException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsRoot);
if (StringUtils.isNotBlank(hdfsUser)) {
return FileSystem.get(new URI(hdfsRoot), configuration, hdfsUser);
} else {
return FileSystem.get(new URI(hdfsRoot), configuration);
}
}
protected ReaderState snapshotState() {
Map<Integer, List<byte[]>> bytes;
try {
bytes = internalSource.snapshotState(checkpointId);
} catch (Exception e) {
throw new RuntimeException(e);
}
return new ReaderState(bytes, subtaskId, checkpointId++);
}
public void prepareCheckpoint() {
executor = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, String.format("parallel-reader-checkpoint-executor-%s", subtaskId));
executor.schedule(this::virtualCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS);
}
public void virtualCheckpoint() {
try {
synchronized (checkpointLock) {
while (!handover.isEmpty()) {
Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
}
// Block #next() method
synchronized (handover) {
final int currentCheckpoint = checkpointId;
ReaderState readerState = snapshotState();
saveState(readerState, currentCheckpoint);
internalSource.notifyCheckpointComplete(currentCheckpoint);
running = false;
}
}
} catch (Exception e) {
throw new RuntimeException("An error occurred in virtual checkpoint execution.", e);
}
}
private Map<Integer, List<byte[]>> restoreState(int checkpointId) throws IOException {
Path hdfsPath = getCheckpointPathWithId(checkpointId);
if (!fileSystem.exists(hdfsPath)) {
return null;
}
try (FSDataInputStream inputStream = fileSystem.open(hdfsPath);
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
int i = 0;
final int defaultLen = 1024;
byte[] buffer = new byte[defaultLen];
while ((i = inputStream.read(buffer)) != -1) {
out.write(buffer, 0, i);
}
return ((ReaderState) SerializationUtils.deserialize(out.toByteArray())).getBytes();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected void saveState(ReaderState readerState, int checkpointId) throws IOException {
byte[] bytes = SerializationUtils.serialize(readerState);
Path hdfsPath = getCheckpointPathWithId(checkpointId);
if (!fileSystem.exists(hdfsPath)) {
fileSystem.createNewFile(hdfsPath);
}
try (FSDataOutputStream outputStream = fileSystem.append(hdfsPath)) {
outputStream.write(bytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Path getCheckpointPathWithId(int checkpointId) {
return new Path(this.checkpointPath + File.separator + this.subtaskId + File.separator + checkpointId);
}
@Override
public void close() throws IOException {
fileSystem.close();
executor.shutdown();
super.close();
}
}