blob: 2c2e1b483a5b1339a7413503ecc8cdf2368d95ec [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.tools.cdc.mongodb;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.bson.Document;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
public class MongoDBSchema extends SourceSchema {
public MongoDBSchema(
ArrayList<Document> sampleData,
String databaseName,
String tableName,
String tableComment)
throws Exception {
super(databaseName, null, tableName, tableComment);
fields = new LinkedHashMap<>();
for (Document data : sampleData) {
processSampleData(data);
}
primaryKeys = new ArrayList<>();
primaryKeys.add("_id");
}
private void processSampleData(Document sampleData) {
for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
String dorisType = MongoDBType.toDorisType(value);
if (isDecimalField(fieldName)) {
dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
}
fields.put(fieldName, new FieldSchema(fieldName, dorisType, null));
}
}
private boolean isDecimalField(String fieldName) {
FieldSchema existingField = fields.get(fieldName);
return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL);
}
private String replaceDecimalTypeIfNeeded(String fieldName, String newDorisType) {
FieldSchema existingField = fields.get(fieldName);
if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) {
Tuple2<Integer, Integer> existingPrecisionAndScale =
MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString());
int existingPrecision = existingPrecisionAndScale.f0;
int existingScale = existingPrecisionAndScale.f1;
Tuple2<Integer, Integer> currentPrecisionAndScale =
MongoDBType.getDecimalPrecisionAndScale(newDorisType);
int currentPrecision = currentPrecisionAndScale.f0;
int currentScale = currentPrecisionAndScale.f1;
int newScale = Math.max(existingScale, currentScale);
int newIntegerPartSize =
Math.max(existingPrecision - existingScale, currentPrecision - currentScale);
int newPrecision = newIntegerPartSize + newScale;
return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")";
}
return newDorisType;
}
@Override
public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
return null;
}
@Override
public String getCdcTableName() {
return databaseName + "\\." + tableName;
}
}