IGNITE-21732: Sql. Split TableRowConverterImpl on two different implementations (#3687)

diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index ec2b373..b3753a1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -22,14 +22,18 @@
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.BitSet;
 import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -38,6 +42,7 @@
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.type.NativeTypeSpec;
 import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Adapter for row of older schema.
@@ -440,13 +445,56 @@
         return mappedId < 0 ? (Instant) column.defaultValue() : super.timestampValue(mappedId);
     }
 
+    @Override
+    public boolean hasNullValue(int colIdx) {
+        int mappedId = mapColumn(colIdx);
+
+        return mappedId < 0
+                ? mapper.mappedColumn(colIdx).defaultValue() == null
+                : super.hasNullValue(mappedId);
+    }
+
+
+    @Override
+    public int elementCount() {
+        return newBinaryTupleSchema.elementCount();
+    }
+
     /** {@inheritDoc} */
     @Override
-    public BinaryTuple binaryTuple() {
+    public @Nullable BinaryTuple binaryTuple() {
         // Underlying binary tuple can not be used directly.
         return null;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer byteBuffer() {
+        // TODO: IGNITE-22156 Replace inheritance with delegation and drop this code.
+        int size = newBinaryTupleSchema.elementCount();
+        var builder = new BinaryTupleBuilder(size);
+
+        for (int col = 0; col < size; col++) {
+            Element element = newBinaryTupleSchema.element(col);
+
+            BinaryRowConverter.appendValue(builder, element, value(col));
+        }
+
+        return new BinaryTuple(size, builder.build()).byteBuffer();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int tupleSliceLength() {
+        throw new UnsupportedOperationException("Underlying binary can't be accessed directly.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer tupleSlice() {
+        throw new UnsupportedOperationException("Underlying binary can't be accessed directly.");
+    }
+
     private void ensureTypeConversionAllowed(ColumnType from, ColumnType to) throws InvalidTypeException {
         if (!isSupportedColumnTypeChange(from, to)) {
             throw new SchemaException(format("Type conversion is not allowed: {} -> {}", from, to));
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ProjectedTableRowConverterImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ProjectedTableRowConverterImpl.java
new file mode 100644
index 0000000..d7ece09
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ProjectedTableRowConverterImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.BitSet;
+import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
+import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;
+
+/**
+ * Converts rows to execution engine representation.
+ */
+public class ProjectedTableRowConverterImpl extends TableRowConverterImpl {
+    /**
+     * Mapping of required columns to their indexes in physical schema.
+     */
+    private final int[] requiredColumnsMapping;
+
+    private final BinaryTupleSchema fullTupleSchema;
+
+    /** Constructor. */
+    ProjectedTableRowConverterImpl(
+            SchemaRegistry schemaRegistry,
+            BinaryTupleSchema fullTupleSchema,
+            SchemaDescriptor schemaDescriptor,
+            BitSet requiredColumns
+    ) {
+        super(schemaRegistry, schemaDescriptor);
+
+        this.fullTupleSchema = fullTupleSchema;
+
+        int size = requiredColumns.cardinality();
+
+        requiredColumnsMapping = new int[size];
+
+        int requiredIndex = 0;
+        for (Column column : schemaDescriptor.columns()) {
+            if (requiredColumns.get(column.positionInRow())) {
+                requiredColumnsMapping[requiredIndex++] = column.positionInRow();
+            }
+        }
+    }
+
+    @Override
+    public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow tableRow, RowFactory<RowT> factory) {
+        InternalTuple tuple;
+        if (tableRow.schemaVersion() == schemaDescriptor.version()) {
+            BinaryTuple tableTuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());
+
+            tuple = new FormatAwareProjectedTuple(tableTuple, requiredColumnsMapping);
+        } else {
+            InternalTuple tableTuple = schemaRegistry.resolve(tableRow, schemaDescriptor);
+
+            tuple = new FieldDeserializingProjectedTuple(
+                    fullTupleSchema,
+                    tableTuple,
+                    requiredColumnsMapping
+            );
+        }
+
+        return factory.create(tuple);
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactoryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactoryImpl.java
index fa9e932..8ce0c7a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactoryImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterFactoryImpl.java
@@ -54,9 +54,7 @@
 
         fullRowConverter = new TableRowConverterImpl(
                 schemaRegistry,
-                fullTupleSchema,
-                schemaDescriptor,
-                null
+                schemaDescriptor
         );
     }
 
@@ -66,7 +64,7 @@
             return fullRowConverter;
         }
 
-        return new TableRowConverterImpl(
+        return new ProjectedTableRowConverterImpl(
                 schemaRegistry,
                 fullTupleSchema,
                 schemaDescriptor,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
index d27aba4..ac3a5fc 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterImpl.java
@@ -17,62 +17,29 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import java.util.BitSet;
 import org.apache.ignite.internal.lang.InternalTuple;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
-import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Converts rows to execution engine representation.
  */
 public class TableRowConverterImpl implements TableRowConverter {
 
-    private final SchemaRegistry schemaRegistry;
+    protected final SchemaRegistry schemaRegistry;
 
-    private final SchemaDescriptor schemaDescriptor;
-
-    private final BinaryTupleSchema fullTupleSchema;
-
-    /**
-     * Mapping of required columns to their indexes in physical schema.
-     */
-    private final int[] requiredColumnsMapping;
-
-    private final boolean skipTrimming;
+    protected final SchemaDescriptor schemaDescriptor;
 
     /** Constructor. */
     TableRowConverterImpl(
             SchemaRegistry schemaRegistry,
-            BinaryTupleSchema fullTupleSchema,
-            SchemaDescriptor schemaDescriptor,
-            @Nullable BitSet requiredColumns
+            SchemaDescriptor schemaDescriptor
     ) {
         this.schemaRegistry = schemaRegistry;
         this.schemaDescriptor = schemaDescriptor;
-        this.fullTupleSchema = fullTupleSchema;
-
-        this.skipTrimming = requiredColumns == null;
-
-        int size = requiredColumns == null
-                ? schemaDescriptor.length()
-                : requiredColumns.cardinality();
-
-        requiredColumnsMapping = new int[size];
-
-        int requiredIndex = 0;
-        for (Column column : schemaDescriptor.columns()) {
-            if (requiredColumns == null || requiredColumns.get(column.positionInRow())) {
-                requiredColumnsMapping[requiredIndex++] = column.positionInRow();
-            }
-        }
     }
 
     /** {@inheritDoc} */
@@ -99,20 +66,11 @@
             RowHandler.RowFactory<RowT> factory
     ) {
         InternalTuple tuple;
+
         if (tableRow.schemaVersion() == schemaDescriptor.version()) {
-            InternalTuple tableTuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());
-
-            tuple = skipTrimming
-                    ? tableTuple
-                    : new FormatAwareProjectedTuple(tableTuple, requiredColumnsMapping);
+            tuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());
         } else {
-            InternalTuple tableTuple = schemaRegistry.resolve(tableRow, schemaDescriptor);
-
-            tuple = new FieldDeserializingProjectedTuple(
-                    fullTupleSchema,
-                    tableTuple,
-                    requiredColumnsMapping
-            );
+            tuple = schemaRegistry.resolve(tableRow, schemaDescriptor);
         }
 
         return factory.create(tuple);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ProjectedTableRowConverterSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ProjectedTableRowConverterSelfTest.java
new file mode 100644
index 0000000..796ac92
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ProjectedTableRowConverterSelfTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.BitSets;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link TableRowConverterImpl}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class ProjectedTableRowConverterSelfTest extends BaseIgniteAbstractTest {
+
+    @Mock
+    private SchemaRegistry schemaRegistry;
+
+    @Mock
+    private ExecutionContext<RowWrapper> executionContext;
+
+    /** Test checks conversion from storage row to execution engine row. */
+    @Test
+    public void testToEngineRowSameVersion() {
+        SchemaDescriptor schema = newSchema(
+                List.of(
+                        Map.entry("c3", NativeTypes.UUID),
+                        Map.entry("c2", NativeTypes.STRING),
+                        Map.entry("c4", NativeTypes.BOOLEAN),
+                        Map.entry("c1", NativeTypes.INT32)
+                        ),
+                List.of("c1"),
+                null
+        );
+
+        RowSchema rowSchema = RowSchema.builder()
+                .addField(NativeTypes.STRING)
+                .addField(NativeTypes.INT32)
+                .build();
+
+        RowHandler<RowWrapper> rowHandler = SqlRowHandler.INSTANCE;
+        RowFactory<RowWrapper> rowFactory = rowHandler.factory(rowSchema);
+
+        ByteBuffer tupleBuf = new BinaryTupleBuilder(schema.length())
+                .appendUuid(UUID.randomUUID())
+                .appendString("ABC")
+                .appendBoolean(true)
+                .appendInt(100)
+                .build();
+
+        BinaryRow binaryRow = new BinaryRowImpl(schema.version(), tupleBuf);
+
+        ProjectedTableRowConverterImpl converter = new ProjectedTableRowConverterImpl(
+                schemaRegistry,
+                BinaryTupleSchema.createRowSchema(schema),
+                schema,
+                BitSets.of(1, 3)
+        );
+
+        RowWrapper row = converter.toRow(executionContext, binaryRow, rowFactory);
+        assertEquals("ABC", rowHandler.get(0, row));
+        assertEquals(100, rowHandler.get(1, row));
+    }
+
+    private static SchemaDescriptor newSchema(
+            List<Map.Entry<String, NativeType>> columns,
+            List<String> keyColumns,
+            @Nullable List<String> colocationColumns
+    ) {
+        List<Column> columnList = columns.stream()
+                .map(entry -> {
+                    String name = entry.getKey();
+                    NativeType type = entry.getValue();
+
+                    return new Column(name, type, !keyColumns.contains(name));
+                })
+                .collect(Collectors.toList());
+        return new SchemaDescriptor(
+                1,
+                columnList,
+                keyColumns,
+                colocationColumns
+        );
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterSelfTest.java
index 42f2470..f616817 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterSelfTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TableRowConverterSelfTest.java
@@ -33,7 +33,6 @@
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -94,9 +93,7 @@
 
         TableRowConverterImpl converter = new TableRowConverterImpl(
                 schemaRegistry,
-                BinaryTupleSchema.createRowSchema(schema),
-                schema,
-                null
+                schema
         );
 
         RowWrapper row = converter.toRow(executionContext, binaryRow, rowFactory);
@@ -134,9 +131,7 @@
 
         TableRowConverterImpl converter = new TableRowConverterImpl(
                 schemaRegistry,
-                BinaryTupleSchema.createRowSchema(schema),
-                schema,
-                null
+                schema
         );
         BinaryRowEx convertedRow = converter.toFullRow(executionContext, wrapper);