| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.connect.runtime.errors; |
| |
| import io.openmessaging.connector.api.data.ConnectRecord; |
| import io.openmessaging.connector.api.errors.ConnectException; |
| import org.apache.rocketmq.common.message.MessageExt; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Objects; |
| |
| /** |
| * contains all info current time |
| */ |
| class ProcessingContext implements AutoCloseable { |
| |
| /** |
| * reporters |
| */ |
| private Collection<ErrorReporter> reporters = Collections.emptyList(); |
| |
| /** |
| * send message |
| */ |
| private MessageExt consumedMessage; |
| |
| /** |
| * original source record |
| */ |
| private ConnectRecord sourceRecord; |
| |
| /** |
| * stage |
| */ |
| private ErrorReporter.Stage stage; |
| private Class<?> klass; |
| |
| /** |
| * attempt |
| */ |
| private int attempt; |
| /** |
| * error message |
| */ |
| private Throwable error; |
| |
| /** |
| * reset info |
| */ |
| private void reset() { |
| attempt = 0; |
| stage = null; |
| klass = null; |
| error = null; |
| } |
| |
| /** |
| * Set the record consumed from Kafka in a sink connector. |
| * |
| * @param consumedMessage the record |
| */ |
| public void consumerRecord(MessageExt consumedMessage) { |
| this.consumedMessage = consumedMessage; |
| reset(); |
| } |
| |
| /** |
| * @return the record consumed from Kafka. could be null |
| */ |
| public MessageExt consumerRecord() { |
| return consumedMessage; |
| } |
| |
| /** |
| * @return the source record being processed. |
| */ |
| public ConnectRecord sourceRecord() { |
| return sourceRecord; |
| } |
| |
| /** |
| * Set the source record being processed in the connect pipeline. |
| * |
| * @param record the source record |
| */ |
| public void sourceRecord(ConnectRecord record) { |
| this.sourceRecord = record; |
| reset(); |
| } |
| |
| /** |
| * Set the stage in the connector pipeline which is currently executing. |
| * |
| * @param stage the stage |
| */ |
| public void stage(ErrorReporter.Stage stage) { |
| this.stage = stage; |
| } |
| |
| /** |
| * @return the stage in the connector pipeline which is currently executing. |
| */ |
| public ErrorReporter.Stage stage() { |
| return stage; |
| } |
| |
| /** |
| * @return the class which is going to execute the current operation. |
| */ |
| public Class<?> executingClass() { |
| return klass; |
| } |
| |
| /** |
| * @param klass set the class which is currently executing. |
| */ |
| public void executingClass(Class<?> klass) { |
| this.klass = klass; |
| } |
| |
| /** |
| * A helper method to set both the stage and the class. |
| * |
| * @param stage the stage |
| * @param klass the class which will execute the operation in this stage. |
| */ |
| public void currentContext(ErrorReporter.Stage stage, Class<?> klass) { |
| stage(stage); |
| executingClass(klass); |
| } |
| |
| |
| /** |
| * report errors |
| */ |
| public void report() { |
| if (reporters.size() == 1) { |
| reporters.iterator().next().report(this); |
| } |
| reporters.stream().forEach(r -> r.report(this)); |
| } |
| |
| |
| /** |
| * @param attempt the number of attempts made to execute the current operation. |
| */ |
| public void attempt(int attempt) { |
| this.attempt = attempt; |
| } |
| |
| public int attempt() { |
| return attempt; |
| } |
| |
| |
| public Throwable error() { |
| return error; |
| } |
| |
| /** |
| * set error |
| * |
| * @param error |
| */ |
| public void error(Throwable error) { |
| this.error = error; |
| } |
| |
| |
| /** |
| * @return |
| */ |
| public boolean failed() { |
| return error() != null; |
| } |
| |
| |
| /** |
| * set reporters |
| * |
| * @param reporters |
| */ |
| public void reporters(Collection<ErrorReporter> reporters) { |
| Objects.requireNonNull(reporters); |
| this.reporters = reporters; |
| } |
| |
| @Override |
| public void close() { |
| ConnectException e = null; |
| for (ErrorReporter reporter : reporters) { |
| try { |
| reporter.close(); |
| } catch (Throwable t) { |
| e = e != null ? e : new ConnectException("Failed to close all reporters"); |
| e.addSuppressed(t); |
| } |
| } |
| if (e != null) { |
| throw e; |
| } |
| } |
| |
| public String toString(boolean includeMessage) { |
| StringBuilder builder = new StringBuilder(); |
| builder.append("Executing stage '"); |
| builder.append(stage().name()); |
| builder.append("' with class '"); |
| builder.append(executingClass() == null ? "null" : executingClass().getName()); |
| builder.append('\''); |
| if (includeMessage && sourceRecord() != null) { |
| builder.append(", where source record is = "); |
| builder.append(sourceRecord()); |
| } else if (includeMessage && consumerRecord() != null) { |
| MessageExt msg = consumerRecord(); |
| builder.append(", where consumed record is "); |
| builder.append("{topic='").append(consumedMessage.getTopic()).append('\''); |
| builder.append(", partition=").append(msg.getQueueId()); |
| builder.append(", offset=").append(msg.getQueueOffset()); |
| builder.append(", bornTimestamp=").append(msg.getBornTimestamp()); |
| builder.append(", storeTimestamp=").append(msg.getStoreTimestamp()); |
| builder.append("}"); |
| } |
| builder.append('.'); |
| return builder.toString(); |
| } |
| } |