blob: acadf99990bfb195f16409557ca50b9bff9f6af1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableWrite;
import org.apache.paimon.table.sink.WriteBuilder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@Slf4j
public class PaimonSinkWriter
implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>,
SupportMultiTableSinkWriter<Void> {
private String commitUser = UUID.randomUUID().toString();
private final WriteBuilder tableWriteBuilder;
private final TableWrite tableWrite;
private long checkpointId = 0;
private List<CommitMessage> committables = new ArrayList<>();
private final Table table;
private final SeaTunnelRowType seaTunnelRowType;
private final SinkWriter.Context context;
private final JobContext jobContext;
private TableSchema tableSchema;
public PaimonSinkWriter(
Context context,
Table table,
SeaTunnelRowType seaTunnelRowType,
JobContext jobContext,
PaimonHadoopConfiguration paimonHadoopConfiguration) {
this.table = table;
this.tableWriteBuilder =
JobContextUtil.isBatchJob(jobContext)
? this.table.newBatchWriteBuilder()
: this.table.newStreamWriteBuilder();
this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.jobContext = jobContext;
this.tableSchema = ((FileStoreTable) table).schema();
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}
public PaimonSinkWriter(
Context context,
Table table,
SeaTunnelRowType seaTunnelRowType,
List<PaimonSinkState> states,
JobContext jobContext,
PaimonHadoopConfiguration paimonHadoopConfiguration) {
this(context, table, seaTunnelRowType, jobContext, paimonHadoopConfiguration);
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
this.commitUser = states.get(0).getCommitUser();
this.checkpointId = states.get(0).getCheckpointId();
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> commitables =
states.stream()
.map(PaimonSinkState::getCommittables)
.flatMap(List::stream)
.collect(Collectors.toList());
log.info("Trying to recommit states {}", commitables);
if (JobContextUtil.isBatchJob(jobContext)) {
log.debug("Trying to recommit states batch mode");
((BatchTableCommit) tableCommit).commit(commitables);
} else {
log.debug("Trying to recommit states streaming mode");
((StreamTableCommit) tableCommit).commit(Objects.hash(commitables), commitables);
}
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
}
}
@Override
public void write(SeaTunnelRow element) throws IOException {
InternalRow rowData = RowConverter.reconvert(element, seaTunnelRowType, tableSchema);
try {
PaimonSecurityContext.runSecured(
() -> {
tableWrite.write(rowData);
return null;
});
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_RECORD_FAILED,
"This record " + element + " failed to be written",
e);
}
}
@Override
public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
try {
List<CommitMessage> fileCommittables;
if (JobContextUtil.isBatchJob(jobContext)) {
fileCommittables = ((BatchTableWrite) tableWrite).prepareCommit();
} else {
fileCommittables =
((StreamTableWrite) tableWrite).prepareCommit(false, committables.size());
}
committables.addAll(fileCommittables);
return Optional.of(new PaimonCommitInfo(fileCommittables));
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_PRE_COMMIT_FAILED,
"Flink table store failed to prepare commit",
e);
}
}
@Override
public List<PaimonSinkState> snapshotState(long checkpointId) throws IOException {
this.checkpointId = checkpointId;
PaimonSinkState paimonSinkState =
new PaimonSinkState(new ArrayList<>(committables), commitUser, checkpointId);
committables.clear();
return Collections.singletonList(paimonSinkState);
}
@Override
public void abortPrepare() {}
@Override
public void close() throws IOException {
if (Objects.nonNull(tableWrite)) {
try {
tableWrite.close();
} catch (Exception e) {
log.error("Failed to close table writer in paimon sink writer.", e);
throw new SeaTunnelException(e);
}
}
}
}