blob: e333d37cb5c9a70098832ac6f79f2de953e4ece0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
import java.util.List;
import java.util.Objects;
/**
* An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
* index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
* hot code paths. The {@link RowKind} is inherited from the mutable row.
*/
@Internal
public class EnrichedRowData implements RowData {
private final RowData fixedRow;
// The index mapping is built as follows: positive indexes are indexes refer to mutable row
// positions,
// while negative indexes (with -1 offset) refer to fixed row positions.
// For example an index mapping [0, 1, -1, -2, 2] means:
// * Index 0 -> mutable row index 0
// * Index 1 -> mutable row index 1
// * Index -1 -> fixed row index 0
// * Index -2 -> fixed row index 1
// * Index 2 -> mutable row index 2
private final int[] indexMapping;
private RowData mutableRow;
public EnrichedRowData(RowData fixedRow, int[] indexMapping) {
this.fixedRow = fixedRow;
this.indexMapping = indexMapping;
}
/**
* Replaces the mutable {@link RowData} backing this {@link EnrichedRowData}.
*
* <p>This method replaces the mutable row data in place and does not return a new object. This
* is done for performance reasons.
*/
public EnrichedRowData replaceMutableRow(RowData mutableRow) {
this.mutableRow = mutableRow;
return this;
}
// ---------------------------------------------------------------------------------------------
@Override
public int getArity() {
return indexMapping.length;
}
@Override
public RowKind getRowKind() {
return mutableRow.getRowKind();
}
@Override
public void setRowKind(RowKind kind) {
mutableRow.setRowKind(kind);
}
@Override
public boolean isNullAt(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.isNullAt(index);
} else {
return fixedRow.isNullAt(-(index + 1));
}
}
@Override
public boolean getBoolean(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getBoolean(index);
} else {
return fixedRow.getBoolean(-(index + 1));
}
}
@Override
public byte getByte(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getByte(index);
} else {
return fixedRow.getByte(-(index + 1));
}
}
@Override
public short getShort(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getShort(index);
} else {
return fixedRow.getShort(-(index + 1));
}
}
@Override
public int getInt(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getInt(index);
} else {
return fixedRow.getInt(-(index + 1));
}
}
@Override
public long getLong(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getLong(index);
} else {
return fixedRow.getLong(-(index + 1));
}
}
@Override
public float getFloat(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getFloat(index);
} else {
return fixedRow.getFloat(-(index + 1));
}
}
@Override
public double getDouble(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getDouble(index);
} else {
return fixedRow.getDouble(-(index + 1));
}
}
@Override
public StringData getString(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getString(index);
} else {
return fixedRow.getString(-(index + 1));
}
}
@Override
public DecimalData getDecimal(int pos, int precision, int scale) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getDecimal(index, precision, scale);
} else {
return fixedRow.getDecimal(-(index + 1), precision, scale);
}
}
@Override
public TimestampData getTimestamp(int pos, int precision) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getTimestamp(index, precision);
} else {
return fixedRow.getTimestamp(-(index + 1), precision);
}
}
@Override
public <T> RawValueData<T> getRawValue(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getRawValue(index);
} else {
return fixedRow.getRawValue(-(index + 1));
}
}
@Override
public byte[] getBinary(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getBinary(index);
} else {
return fixedRow.getBinary(-(index + 1));
}
}
@Override
public ArrayData getArray(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getArray(index);
} else {
return fixedRow.getArray(-(index + 1));
}
}
@Override
public MapData getMap(int pos) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getMap(index);
} else {
return fixedRow.getMap(-(index + 1));
}
}
@Override
public RowData getRow(int pos, int numFields) {
int index = indexMapping[pos];
if (index >= 0) {
return mutableRow.getRow(index, numFields);
} else {
return fixedRow.getRow(-(index + 1), numFields);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EnrichedRowData that = (EnrichedRowData) o;
return Objects.equals(this.fixedRow, that.fixedRow)
&& Objects.equals(this.mutableRow, that.mutableRow);
}
@Override
public int hashCode() {
return Objects.hash(fixedRow, mutableRow);
}
@Override
public String toString() {
return mutableRow.getRowKind().shortString()
+ "{"
+ "fixedRow="
+ fixedRow
+ ", mutableRow="
+ mutableRow
+ '}';
}
/**
* Creates a new {@link EnrichedRowData} with the provided {@code fixedRow} as the immutable
* static row, and uses the {@code producedRowFields}, {@code fixedRowFields} and {@code
* mutableRowFields} arguments to compute the indexes mapping.
*
* <p>The {@code producedRowFields} should include the name of fields of the full row once
* mutable and fixed rows are merged, while {@code fixedRowFields} and {@code mutableRowFields}
* should contain respectively the field names of fixed row and mutable row. All the lists are
* ordered with indexes matching the position of the field in the row. As an example, for a
* complete row {@code (a, b, c)} the mutable row might be {@code (a, c)} and the fixed row
* might be {@code (b)}
*/
public static EnrichedRowData from(
RowData fixedRow,
List<String> producedRowFields,
List<String> mutableRowFields,
List<String> fixedRowFields) {
return new EnrichedRowData(
fixedRow, computeIndexMapping(producedRowFields, mutableRowFields, fixedRowFields));
}
/**
* This method computes the index mapping for {@link EnrichedRowData}.
*
* @see EnrichedRowData#from(RowData, List, List, List)
*/
public static int[] computeIndexMapping(
List<String> producedRowFields,
List<String> mutableRowFields,
List<String> fixedRowFields) {
int[] indexMapping = new int[producedRowFields.size()];
for (int i = 0; i < producedRowFields.size(); i++) {
String fieldName = producedRowFields.get(i);
int newIndex = mutableRowFields.indexOf(fieldName);
if (newIndex < 0) {
newIndex = -(fixedRowFields.indexOf(fieldName) + 1);
}
indexMapping[i] = newIndex;
}
return indexMapping;
}
}