blob: 6f108ee295d94c74b0cc68b85a0febff0e58c778 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.format.text;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
import lombok.NonNull;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
public class TextSerializationSchema implements SerializationSchema {
private final SeaTunnelRowType seaTunnelRowType;
private final String[] separators;
private final DateUtils.Formatter dateFormatter;
private final DateTimeUtils.Formatter dateTimeFormatter;
private final TimeUtils.Formatter timeFormatter;
private final Charset charset;
private TextSerializationSchema(
@NonNull SeaTunnelRowType seaTunnelRowType,
String[] separators,
DateUtils.Formatter dateFormatter,
DateTimeUtils.Formatter dateTimeFormatter,
TimeUtils.Formatter timeFormatter,
Charset charset) {
this.seaTunnelRowType = seaTunnelRowType;
this.separators = separators;
this.dateFormatter = dateFormatter;
this.dateTimeFormatter = dateTimeFormatter;
this.timeFormatter = timeFormatter;
this.charset = charset;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private SeaTunnelRowType seaTunnelRowType;
private String[] separators = TextFormatConstant.SEPARATOR.clone();
private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD;
private DateTimeUtils.Formatter dateTimeFormatter =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;
private Charset charset = StandardCharsets.UTF_8;
private Builder() {}
public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
return this;
}
public Builder delimiter(String delimiter) {
this.separators[0] = delimiter;
return this;
}
public Builder separators(String[] separators) {
this.separators = separators;
return this;
}
public Builder dateFormatter(DateUtils.Formatter dateFormatter) {
this.dateFormatter = dateFormatter;
return this;
}
public Builder dateTimeFormatter(DateTimeUtils.Formatter dateTimeFormatter) {
this.dateTimeFormatter = dateTimeFormatter;
return this;
}
public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {
this.timeFormatter = timeFormatter;
return this;
}
public Builder charset(Charset charset) {
this.charset = charset;
return this;
}
public TextSerializationSchema build() {
return new TextSerializationSchema(
seaTunnelRowType,
separators,
dateFormatter,
dateTimeFormatter,
timeFormatter,
charset);
}
}
@Override
public byte[] serialize(SeaTunnelRow element) {
if (element.getFields().length != seaTunnelRowType.getTotalFields()) {
throw new IndexOutOfBoundsException(
"The data does not match the configured schema information, please check");
}
Object[] fields = element.getFields();
String[] strings = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i), 0);
}
return String.join(separators[0], strings).getBytes(charset);
}
private String convert(Object field, SeaTunnelDataType<?> fieldType, int level) {
if (field == null) {
return "";
}
switch (fieldType.getSqlType()) {
case DOUBLE:
case FLOAT:
case INT:
case BOOLEAN:
case TINYINT:
case SMALLINT:
case BIGINT:
case DECIMAL:
return field.toString();
case STRING:
byte[] bytes = field.toString().getBytes(StandardCharsets.UTF_8);
return new String(bytes, StandardCharsets.UTF_8);
case DATE:
return DateUtils.toString((LocalDate) field, dateFormatter);
case TIME:
return TimeUtils.toString((LocalTime) field, timeFormatter);
case TIMESTAMP:
return DateTimeUtils.toString((LocalDateTime) field, dateTimeFormatter);
case NULL:
return "";
case BYTES:
return new String((byte[]) field, StandardCharsets.UTF_8);
case ARRAY:
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
return Arrays.stream((Object[]) field)
.map(f -> convert(f, elementType, level + 1))
.collect(Collectors.joining(separators[level + 1]));
case MAP:
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();
return ((Map<Object, Object>) field)
.entrySet().stream()
.map(
entry ->
String.join(
separators[level + 2],
convert(entry.getKey(), keyType, level + 1),
convert(
entry.getValue(),
valueType,
level + 1)))
.collect(Collectors.joining(separators[level + 1]));
case ROW:
Object[] fields = ((SeaTunnelRow) field).getFields();
String[] strings = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
strings[i] =
convert(
fields[i],
((SeaTunnelRowType) fieldType).getFieldType(i),
level + 1);
}
return String.join(separators[level + 1], strings);
default:
throw new SeaTunnelTextFormatException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
String.format(
"SeaTunnel format text not supported for parsing this type [%s]",
fieldType.getSqlType()));
}
}
}