blob: c4699db3beda38a46472c1c1ceafb355edcb1bca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources.processor.maxwell;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.spark.api.java.JavaRDD;
import java.util.Locale;
import java.util.Objects;
import java.util.regex.Pattern;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.DATE_STRING;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.EPOCHMILLISECONDS;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.NON_TIMESTAMP;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.UNIX_TIMESTAMP;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.valueOf;
/**
* A {@link JsonKafkaSourcePostProcessor} help to extract fresh data from maxwell json string and tag the record as
* delete or not.
*/
public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final Option<String> databaseRegex;
private final String tableRegex;
public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) {
super(props);
databaseRegex = Option.ofNullable(getStringWithAltKeys(props, JsonKafkaPostProcessorConfig.DATABASE_NAME_REGEX, true));
tableRegex = getStringWithAltKeys(props, JsonKafkaPostProcessorConfig.TABLE_NAME_REGEX);
}
// ------------------------------------------------------------------------
// Partial fields in maxwell json string
// ------------------------------------------------------------------------
private static final String DATABASE = "database";
private static final String TABLE = "table";
private static final String DATA = "data";
private static final String OPERATION_TYPE = "type";
private static final String TS = "ts";
// ------------------------------------------------------------------------
// Operation types
// ------------------------------------------------------------------------
private static final String INSERT = "insert";
private static final String UPDATE = "update";
private static final String DELETE = "delete";
@Override
public JavaRDD<String> process(JavaRDD<String> maxwellJsonRecords) {
return maxwellJsonRecords.map(record -> {
JsonNode inputJson = MAPPER.readTree(record);
String database = inputJson.get(DATABASE).textValue();
String table = inputJson.get(TABLE).textValue();
// filter out target databases and tables
if (isTargetTable(database, table)) {
ObjectNode result = (ObjectNode) inputJson.get(DATA);
String type = inputJson.get(OPERATION_TYPE).textValue();
// insert or update
if (INSERT.equals(type) || UPDATE.equals(type)) {
// tag this record not delete.
result.put(HoodieRecord.HOODIE_IS_DELETED_FIELD, false);
return result.toString();
// delete
} else if (DELETE.equals(type)) {
return processDelete(inputJson, result);
} else {
// there might be some ddl data, ignore it
return null;
}
} else {
// not the data from target table(s), ignore it
return null;
}
}).filter(Objects::nonNull);
}
private String processDelete(JsonNode inputJson, ObjectNode result) {
// tag this record as delete.
result.put(HoodieRecord.HOODIE_IS_DELETED_FIELD, true);
PreCombineFieldType preCombineFieldType =
valueOf(getStringWithAltKeys(
this.props, JsonKafkaPostProcessorConfig.PRECOMBINE_FIELD_TYPE, true).toUpperCase(Locale.ROOT));
// maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we
// want to delete this record correctly, we should update its `update_time` to a time closer to where the
// delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time.
// we can update the `update_time`(delete time) only when it is in timestamp format.
if (!preCombineFieldType.equals(NON_TIMESTAMP)) {
String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(),
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue());
// ts from maxwell
long ts = inputJson.get(TS).longValue();
// convert the `update_time`(delete time) to the proper format.
if (preCombineFieldType.equals(DATE_STRING)) {
// DATE_STRING format
String timeFormat = getStringWithAltKeys(this.props, JsonKafkaPostProcessorConfig.PRECOMBINE_FIELD_FORMAT, true);
result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat));
} else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) {
// EPOCHMILLISECONDS format
result.put(preCombineField, ts * 1000L);
} else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) {
// UNIX_TIMESTAMP format
result.put(preCombineField, ts);
} else {
throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType);
}
}
return result.toString();
}
/**
* Check if it is the right table we want to consume from.
*
* @param database database the data belong to
* @param table table the data belong to
*/
private boolean isTargetTable(String database, String table) {
if (!databaseRegex.isPresent()) {
return Pattern.matches(tableRegex, table);
} else {
return Pattern.matches(databaseRegex.get(), database) && Pattern.matches(tableRegex, table);
}
}
}