blob: b8094d261897844b249fc746e0136a3732906a27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.transforms;
import com.google.common.base.Strings;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaAndValue;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* change case
* @param <R>
*/
public abstract class ChangeCase<R extends ConnectRecord> extends BaseTransformation<R> {
private static final Logger log = LoggerFactory.getLogger(ChangeCase.class);
class State {
public final Map<String, String> columnMapping;
public final Schema schema;
State(Map<String, String> columnMapping, Schema schema) {
this.columnMapping = columnMapping;
this.schema = schema;
}
}
Map<Schema, State> schemaState = new HashMap<>();
private ChangeCaseConfig config;
@Override
public void start(KeyValue config) {
this.config = new ChangeCaseConfig(config);
}
@Override
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
// state
final State state = this.schemaState.computeIfAbsent(inputSchema, schema -> {
final SchemaBuilder builder = SchemaBuilder.struct();
if (!Strings.isNullOrEmpty(schema.getName())) {
builder.name(schema.getName());
}
if (schema.isOptional()) {
builder.optional();
}
final Map<String, String> columnMapping = new LinkedHashMap<>();
for (Field field : schema.getFields()) {
final String newFieldName = this.config.from.to(this.config.to, field.getName());
log.trace("processStruct() - Mapped '{}' to '{}'", field.getName(), newFieldName);
columnMapping.put(field.getName(), newFieldName);
builder.field(newFieldName, field.getSchema());
}
return new State(columnMapping, builder.build());
});
final Struct outputStruct = new Struct(state.schema);
for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
final Object value = input.get(kvp.getKey());
outputStruct.put(kvp.getValue(), value);
}
return new SchemaAndValue(state.schema, outputStruct);
}
/**
* transform key
*/
public static class Key extends ChangeCase<ConnectRecord> {
@Override
public ConnectRecord doTransform(ConnectRecord r) {
final SchemaAndValue transformed = process(r, r.getKeySchema(), r.getKey());
ConnectRecord record = new ConnectRecord(
r.getPosition().getPartition(),
r.getPosition().getOffset(),
r.getTimestamp(),
transformed.schema(),
transformed.value(),
r.getSchema(),
r.getData()
);
record.setExtensions(r.getExtensions());
return record;
}
}
/**
* transform value
*/
public static class Value extends ChangeCase<ConnectRecord> {
@Override
public ConnectRecord doTransform(ConnectRecord r) {
final SchemaAndValue transformed = process(r, r.getSchema(), r.getData());
ConnectRecord record = new ConnectRecord(
r.getPosition().getPartition(),
r.getPosition().getOffset(),
r.getTimestamp(),
r.getKeySchema(),
r.getKey(),
transformed.schema(),
transformed.value()
);
record.setExtensions(r.getExtensions());
return record;
}
}
}