IGNITE-22156 Replace inheritance with delegation for UpgradingRowAdapter class. (#3696)

diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/table/ClientHandlerTupleTests.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/table/ClientHandlerTupleTests.java
index 93c94e5..a2e653a 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/table/ClientHandlerTupleTests.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/table/ClientHandlerTupleTests.java
@@ -40,7 +40,7 @@
 import java.time.Month;
 import java.util.Random;
 import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
@@ -91,7 +91,7 @@
     public void testTupleEquality() throws TupleMarshallerException {
         Tuple tuple = createTuple();
 
-        BinaryTuple binaryTuple = new TupleMarshallerImpl(fullSchema).marshal(tuple).binaryTuple();
+        BinaryTupleReader binaryTuple = new TupleMarshallerImpl(fullSchema).marshal(tuple).binaryTuple();
         Tuple clientHandlerTuple = new ClientHandlerTuple(fullSchema, null, binaryTuple, false);
 
         assertEquals(tuple, clientHandlerTuple);
@@ -101,7 +101,7 @@
     public void testTupleEqualityKeyOnly() throws TupleMarshallerException {
         Tuple tuple = createKeyTuple();
 
-        BinaryTuple binaryTuple = new TupleMarshallerImpl(fullSchema).marshalKey(tuple).binaryTuple();
+        BinaryTupleReader binaryTuple = new TupleMarshallerImpl(fullSchema).marshalKey(tuple).binaryTuple();
         Tuple clientHandlerTuple = new ClientHandlerTuple(fullSchema, null, binaryTuple, true);
 
         assertEquals(tuple, clientHandlerTuple);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index b3753a1..0f11ab6 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -47,7 +47,7 @@
 /**
  * Adapter for row of older schema.
  */
-public class UpgradingRowAdapter extends Row {
+public class UpgradingRowAdapter implements Row {
     /** Column mapper. */
     private final ColumnMapper mapper;
 
@@ -56,8 +56,11 @@
 
     private final BinaryTupleSchema newBinaryTupleSchema;
 
+    /** Row of previous version. */
+    private final Row row;
+
     private UpgradingRowAdapter(SchemaDescriptor newSchema, BinaryTupleSchema newBinaryTupleSchema, Row row, ColumnMapper mapper) {
-        super(false, row.schema(), row.binaryTupleSchema(), row);
+        this.row = row;
 
         this.newSchema = newSchema;
         this.mapper = mapper;
@@ -100,6 +103,12 @@
         return newSchema.version();
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public boolean keyOnly() {
+        return false;
+    }
+
     /**
      * Map column.
      *
@@ -125,13 +134,13 @@
     public boolean booleanValue(int colIdx) {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.BOOLEAN != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (boolean) column.defaultValue() : super.booleanValue(mappedId);
+        return mappedId < 0 ? (boolean) column.defaultValue() : row.booleanValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -139,13 +148,13 @@
     public Boolean booleanValueBoxed(int colIdx) {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.BOOLEAN != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (Boolean) column.defaultValue() : super.booleanValueBoxed(mappedId);
+        return mappedId < 0 ? (Boolean) column.defaultValue() : row.booleanValueBoxed(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -153,11 +162,11 @@
     public byte byteValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT8);
 
-        return mappedId < 0 ? (byte) column.defaultValue() : super.byteValue(mappedId);
+        return mappedId < 0 ? (byte) column.defaultValue() : row.byteValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -165,11 +174,11 @@
     public Byte byteValueBoxed(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT8);
 
-        return mappedId < 0 ? (Byte) column.defaultValue() : super.byteValueBoxed(mappedId);
+        return mappedId < 0 ? (Byte) column.defaultValue() : row.byteValueBoxed(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -177,11 +186,11 @@
     public short shortValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT16);
 
-        return mappedId < 0 ? (short) column.defaultValue() : super.shortValue(mappedId);
+        return mappedId < 0 ? (short) column.defaultValue() : row.shortValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -189,11 +198,11 @@
     public Short shortValueBoxed(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT16);
 
-        return mappedId < 0 ? (Short) column.defaultValue() : super.shortValueBoxed(mappedId);
+        return mappedId < 0 ? (Short) column.defaultValue() : row.shortValueBoxed(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -201,11 +210,11 @@
     public int intValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT32);
 
-        return mappedId < 0 ? (int) column.defaultValue() : super.intValue(mappedId);
+        return mappedId < 0 ? (int) column.defaultValue() : row.intValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -213,11 +222,11 @@
     public Integer intValueBoxed(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT32);
 
-        return mappedId < 0 ? (Integer) column.defaultValue() : super.intValueBoxed(mappedId);
+        return mappedId < 0 ? (Integer) column.defaultValue() : row.intValueBoxed(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -225,11 +234,11 @@
     public long longValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT64);
 
-        return mappedId < 0 ? (long) column.defaultValue() : super.longValue(mappedId);
+        return mappedId < 0 ? (long) column.defaultValue() : row.longValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -237,11 +246,11 @@
     public Long longValueBoxed(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.INT64);
 
-        return mappedId < 0 ? (Long) column.defaultValue() : super.longValueBoxed(mappedId);
+        return mappedId < 0 ? (Long) column.defaultValue() : row.longValueBoxed(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -249,11 +258,11 @@
     public float floatValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.FLOAT);
 
-        return mappedId < 0 ? (float) column.defaultValue() : super.floatValue(mappedId);
+        return mappedId < 0 ? (float) column.defaultValue() : row.floatValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -261,11 +270,11 @@
     public Float floatValueBoxed(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.FLOAT);
 
-        return mappedId < 0 ? (Float) column.defaultValue() : super.floatValueBoxed(mappedId);
+        return mappedId < 0 ? (Float) column.defaultValue() : row.floatValueBoxed(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -273,11 +282,11 @@
     public double doubleValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.DOUBLE);
 
-        return mappedId < 0 ? (double) column.defaultValue() : super.doubleValue(mappedId);
+        return mappedId < 0 ? (double) column.defaultValue() : row.doubleValue(mappedId);
     }
 
 
@@ -286,37 +295,37 @@
     public Double doubleValueBoxed(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         ensureTypeConversionAllowed(column.type().spec().asColumnType(), ColumnType.DOUBLE);
 
-        return mappedId < 0 ? (Double) column.defaultValue() : super.doubleValueBoxed(mappedId);
+        return mappedId < 0 ? (Double) column.defaultValue() : row.doubleValueBoxed(mappedId);
     }
 
     @Override
     public BigDecimal decimalValue(int colIdx) {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.DECIMAL != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (BigDecimal) column.defaultValue() : super.decimalValue(mappedId);
+        return mappedId < 0 ? (BigDecimal) column.defaultValue() : row.decimalValue(mappedId);
     }
 
     @Override
     public BigDecimal decimalValue(int colIdx, int scale) {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.DECIMAL != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (BigDecimal) column.defaultValue() : super.decimalValue(mappedId, scale);
+        return mappedId < 0 ? (BigDecimal) column.defaultValue() : row.decimalValue(mappedId, scale);
     }
 
     /** {@inheritDoc} */
@@ -324,13 +333,13 @@
     public BigInteger numberValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.NUMBER != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (BigInteger) column.defaultValue() : super.numberValue(mappedId);
+        return mappedId < 0 ? (BigInteger) column.defaultValue() : row.numberValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -338,13 +347,13 @@
     public String stringValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.STRING != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (String) column.defaultValue() : super.stringValue(mappedId);
+        return mappedId < 0 ? (String) column.defaultValue() : row.stringValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -352,13 +361,13 @@
     public byte[] bytesValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.BYTES != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (byte[]) column.defaultValue() : super.bytesValue(mappedId);
+        return mappedId < 0 ? (byte[]) column.defaultValue() : row.bytesValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -366,13 +375,13 @@
     public UUID uuidValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.UUID != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (UUID) column.defaultValue() : super.uuidValue(mappedId);
+        return mappedId < 0 ? (UUID) column.defaultValue() : row.uuidValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -380,13 +389,13 @@
     public BitSet bitmaskValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.BITMASK != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (BitSet) column.defaultValue() : super.bitmaskValue(mappedId);
+        return mappedId < 0 ? (BitSet) column.defaultValue() : row.bitmaskValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -394,13 +403,13 @@
     public LocalDate dateValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.DATE != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (LocalDate) column.defaultValue() : super.dateValue(mappedId);
+        return mappedId < 0 ? (LocalDate) column.defaultValue() : row.dateValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -408,13 +417,13 @@
     public LocalTime timeValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.TIME != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (LocalTime) column.defaultValue() : super.timeValue(mappedId);
+        return mappedId < 0 ? (LocalTime) column.defaultValue() : row.timeValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -422,13 +431,13 @@
     public LocalDateTime dateTimeValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.DATETIME != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (LocalDateTime) column.defaultValue() : super.dateTimeValue(mappedId);
+        return mappedId < 0 ? (LocalDateTime) column.defaultValue() : row.dateTimeValue(mappedId);
     }
 
     /** {@inheritDoc} */
@@ -436,13 +445,13 @@
     public Instant timestampValue(int colIdx) throws InvalidTypeException {
         int mappedId = mapColumn(colIdx);
 
-        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : super.schema().column(mappedId);
+        Column column = mappedId < 0 ? mapper.mappedColumn(colIdx) : row.schema().column(mappedId);
 
         if (NativeTypeSpec.TIMESTAMP != column.type().spec()) {
             throw new SchemaException("Type conversion is not supported yet.");
         }
 
-        return mappedId < 0 ? (Instant) column.defaultValue() : super.timestampValue(mappedId);
+        return mappedId < 0 ? (Instant) column.defaultValue() : row.timestampValue(mappedId);
     }
 
     @Override
@@ -451,9 +460,14 @@
 
         return mappedId < 0
                 ? mapper.mappedColumn(colIdx).defaultValue() == null
-                : super.hasNullValue(mappedId);
+                : row.hasNullValue(mappedId);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public int colocationHash() {
+        return row.colocationHash();
+    }
 
     @Override
     public int elementCount() {
@@ -470,7 +484,6 @@
     /** {@inheritDoc} */
     @Override
     public ByteBuffer byteBuffer() {
-        // TODO: IGNITE-22156 Replace inheritance with delegation and drop this code.
         int size = newBinaryTupleSchema.elementCount();
         var builder = new BinaryTupleBuilder(size);
 
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 51e378a..34894a1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -18,59 +18,26 @@
 package org.apache.ignite.internal.schema.row;
 
 import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
-import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.lang.InternalTuple;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaAware;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.util.ColocationUtils;
-import org.apache.ignite.internal.util.HashCalculator;
 
 /**
- * Schema-aware row.
- *
- * <p>The class contains non-generic methods to read boxed and unboxed primitives based on the schema column types. Any type conversions
- * and coercions should be implemented outside the row by the key-value or query runtime.
- *
- * <p>When a non-boxed primitive is read from a null column value, it is converted to the primitive type default value.
+ * Schema-aware row interface.
  */
-public class Row extends BinaryTupleReader implements BinaryRowEx, SchemaAware, InternalTuple, BinaryTupleContainer {
-    /** Schema descriptor. */
-    private final SchemaDescriptor schema;
-
-    /** Binary row. */
-    private final BinaryRow row;
-
-    private final BinaryTupleSchema binaryTupleSchema;
-
-    private final boolean keyOnly;
-
-    /** Cached colocation hash value. */
-    private int colocationHash;
-
-    protected Row(boolean keyOnly, SchemaDescriptor schema, BinaryTupleSchema binaryTupleSchema, BinaryRow row) {
-        super(binaryTupleSchema.elementCount(), row.tupleSlice());
-
-        this.keyOnly = keyOnly;
-        this.row = row;
-        this.schema = schema;
-        this.binaryTupleSchema = binaryTupleSchema;
-    }
-
+public interface Row extends SchemaAware, BinaryRowEx, InternalTuple, BinaryTupleContainer {
     /**
      * Creates a row from a given {@code BinaryRow}.
      *
      * @param schema Schema.
      * @param binaryRow Binary row.
      */
-    public static Row wrapBinaryRow(SchemaDescriptor schema, BinaryRow binaryRow) {
-        return new Row(false, schema, BinaryTupleSchema.createRowSchema(schema), binaryRow);
+    static Row wrapBinaryRow(SchemaDescriptor schema, BinaryRow binaryRow) {
+        return new RowImpl(false, schema, BinaryTupleSchema.createRowSchema(schema), binaryRow);
     }
 
     /**
@@ -79,106 +46,25 @@
      * @param schema Schema.
      * @param binaryRow Binary row.
      */
-    public static Row wrapKeyOnlyBinaryRow(SchemaDescriptor schema, BinaryRow binaryRow) {
-        return new Row(true, schema, BinaryTupleSchema.createKeySchema(schema), binaryRow);
+    static Row wrapKeyOnlyBinaryRow(SchemaDescriptor schema, BinaryRow binaryRow) {
+        return new RowImpl(true, schema, BinaryTupleSchema.createKeySchema(schema), binaryRow);
     }
 
+    /** Short-cut method that reads decimal value with a scale from the schema. */
+    BigDecimal decimalValue(int col);
+
     /**
-     * Get row schema.
+     * Reads value for specified column.
+     *
+     * @param colIdx Column index.
+     * @return Column value.
      */
-    @Override
-    public SchemaDescriptor schema() {
-        return schema;
-    }
+    Object value(int colIdx);
 
     /**
      * Gets a value indicating whether the row contains only key columns.
      *
      * @return {@code true} if the row contains only key columns.
      */
-    public boolean keyOnly() {
-        return keyOnly;
-    }
-
-    /**
-     * Reads value for specified column.
-     *
-     * @param col Column index.
-     * @return Column value.
-     */
-    public Object value(int col) {
-        return binaryTupleSchema.value(this, col);
-    }
-
-    public BigDecimal decimalValue(int col) {
-        return binaryTupleSchema.decimalValue(this, col);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int schemaVersion() {
-        return row.schemaVersion();
-    }
-
-    @Override
-    public ByteBuffer tupleSlice() {
-        return row.tupleSlice();
-    }
-
-    @Override
-    public int tupleSliceLength() {
-        return row.tupleSliceLength();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int colocationHash() {
-        int h0 = colocationHash;
-
-        if (h0 == 0) {
-            HashCalculator hashCalc = new HashCalculator();
-
-            for (Column c : schema.colocationColumns()) {
-                int idx = keyOnly
-                        ? c.positionInKey()
-                        : c.positionInRow();
-
-                assert idx >= 0 : c;
-
-                ColocationUtils.append(hashCalc, value(idx), c.type());
-            }
-
-            colocationHash = h0 = hashCalc.hash();
-        }
-
-        return h0;
-    }
-
-    @Override
-    public BinaryTuple binaryTuple() {
-        return new BinaryTuple(binaryTupleSchema.elementCount(), row.tupleSlice());
-    }
-
-    public BinaryTupleSchema binaryTupleSchema() {
-        return binaryTupleSchema;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        Row row1 = (Row) o;
-
-        return row.equals(row1.row);
-    }
-
-    @Override
-    public int hashCode() {
-        return row.hashCode();
-    }
+    boolean keyOnly();
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowImpl.java
new file mode 100644
index 0000000..592065f
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/RowImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.row;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.util.ColocationUtils;
+import org.apache.ignite.internal.util.HashCalculator;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Schema-aware row.
+ *
+ * <p>The class contains non-generic methods to read boxed and unboxed primitives based on the schema column types. Any type conversions
+ * and coercions should be implemented outside the row by the key-value or query runtime.
+ *
+ * <p>When a non-boxed primitive is read from a null column value, it is converted to the primitive type default value.
+ */
+public class RowImpl extends BinaryTupleReader implements Row, BinaryRowEx {
+    /** Schema descriptor. */
+    private final SchemaDescriptor schema;
+
+    /** Binary row. */
+    private final BinaryRow row;
+
+    private final BinaryTupleSchema binaryTupleSchema;
+
+    private final boolean keyOnly;
+
+    /** Cached colocation hash value. */
+    private int colocationHash;
+
+    RowImpl(boolean keyOnly, SchemaDescriptor schema, BinaryTupleSchema binaryTupleSchema, BinaryRow row) {
+        super(binaryTupleSchema.elementCount(), row.tupleSlice());
+
+        this.keyOnly = keyOnly;
+        this.row = row;
+        this.schema = schema;
+        this.binaryTupleSchema = binaryTupleSchema;
+    }
+
+    /**
+     * Get row schema.
+     */
+    @Override
+    public SchemaDescriptor schema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean keyOnly() {
+        return keyOnly;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Object value(int col) {
+        return binaryTupleSchema.value(this, col);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable BigDecimal decimalValue(int col) {
+        return binaryTupleSchema.decimalValue(this, col);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int schemaVersion() {
+        return row.schemaVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer tupleSlice() {
+        return row.tupleSlice();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int tupleSliceLength() {
+        return row.tupleSliceLength();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int colocationHash() {
+        int h0 = colocationHash;
+
+        if (h0 == 0) {
+            HashCalculator hashCalc = new HashCalculator();
+
+            for (Column c : schema.colocationColumns()) {
+                int idx = keyOnly
+                        ? c.positionInKey()
+                        : c.positionInRow();
+
+                assert idx >= 0 : c;
+
+                ColocationUtils.append(hashCalc, value(idx), c.type());
+            }
+
+            colocationHash = h0 = hashCalc.hash();
+        }
+
+        return h0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public BinaryTupleReader binaryTuple() {
+        return new BinaryTuple(binaryTupleSchema.elementCount(), row.tupleSlice());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        RowImpl row1 = (RowImpl) o;
+
+        return row.equals(row1.row);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        return row.hashCode();
+    }
+}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index 181e334..b439af4 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -34,12 +34,18 @@
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
@@ -55,6 +61,7 @@
  * Tests row assembling and reading.
  */
 public class UpgradingRowAdapterTest {
+    public static final String NULL_COLUMN_NAME = "valNullCol";
     /** Random. */
     private Random rnd;
 
@@ -71,15 +78,52 @@
     }
 
     @Test
-    public void testVariousColumnTypes() {
-        SchemaDescriptor schema = new SchemaDescriptor(1,
-                new Column[]{new Column("keyUuidCol", NativeTypes.UUID, false)},
-                new Column[]{
-                        new Column("valBooleanCol", BOOLEAN, true),
+    public void testUpgradeRowWithVariousColumnTypes() {
+        SchemaDescriptor schema = createSchemaDescriptorWithColumnsOfAllTypes();
+        SchemaDescriptor schema2 = applyAddingValueColumn(schema, 1, new Column("added", INT8, true));
+
+        var schemaRegistry = new SchemaRegistryImpl(
+                v -> v == 1 ? schema : schema2,
+                schema2
+        );
+
+        List<Object> values = generateRowValues(schema);
+        BinaryRow originalBinaryRow = serializeValuesToRow(schema, values);
+
+        Row originalRow = Row.wrapBinaryRow(schema, originalBinaryRow);
+        Row resolvedRow = schemaRegistry.resolve(originalBinaryRow, schema2);
+
+        // validate UpgradedRowAdapter methods.
+        assertThat("Colocation hash mismatch", resolvedRow.colocationHash(), equalTo(originalRow.colocationHash()));
+        assertThat("KeyOnly flag mismatch", resolvedRow.keyOnly(), equalTo(originalRow.keyOnly()));
+        assertThat("Unexpected element count", resolvedRow.elementCount(), equalTo(originalRow.elementCount() + 1));
+
+        assertNotNull(resolvedRow.byteBuffer());
+        assertNull(resolvedRow.binaryTuple(), "Underlying binary tuple must never be used");
+        assertThrows(UnsupportedOperationException.class, resolvedRow::tupleSlice, "Underlying binary tuple must never be used");
+        assertThrows(UnsupportedOperationException.class, resolvedRow::tupleSliceLength, "Underlying binary tuple must never be used");
+
+        // Validate original row.
+        validateRow(values, originalRow);
+
+        // Validate upgraded row.
+        values.add(1, null);
+        validateRow(values, resolvedRow);
+
+        BinaryRowImpl restoredRow = new BinaryRowImpl(schema2.version(), resolvedRow.byteBuffer());
+        assertThat(restoredRow.schemaVersion(), equalTo(schema2.version()));
+        validateRow(values, Row.wrapBinaryRow(schema2, restoredRow));
+    }
+
+    private static SchemaDescriptor createSchemaDescriptorWithColumnsOfAllTypes() {
+        return new SchemaDescriptor(1,
+                List.of(new Column("valBooleanCol", BOOLEAN, true),
                         new Column("valByteCol", INT8, true),
                         new Column("valShortCol", INT16, true),
                         new Column("valIntCol", INT32, true),
+                        new Column("keyUuidCol", NativeTypes.UUID, false),
                         new Column("valLongCol", INT64, true),
+                        new Column(NULL_COLUMN_NAME, INT64, true),
                         new Column("valFloatCol", FLOAT, true),
                         new Column("valDoubleCol", DOUBLE, true),
                         new Column("valDateCol", DATE, true),
@@ -90,44 +134,34 @@
                         new Column("valBytesCol", BYTES, false),
                         new Column("valStringCol", STRING, false),
                         new Column("valNumberCol", NativeTypes.numberOf(20), false),
-                        new Column("valDecimalCol", NativeTypes.decimalOf(25, 5), false),
-                }
+                        new Column("valDecimalCol", NativeTypes.decimalOf(25, 5), false)),
+                List.of("keyUuidCol"),
+                null
+        );
+    }
+
+    private static SchemaDescriptor applyAddingValueColumn(SchemaDescriptor desc, int position, Column newColumn) {
+        List<Column> columns = new ArrayList<>(desc.columns());
+        columns.add(position, newColumn);
+
+        SchemaDescriptor newSchema = new SchemaDescriptor(
+                desc.version() + 1,
+                columns,
+                desc.keyColumns().stream().map(Column::name).collect(Collectors.toList()),
+                desc.colocationColumns().stream().map(Column::name).collect(Collectors.toList())
         );
 
-        SchemaDescriptor schema2 = new SchemaDescriptor(2,
-                new Column[]{new Column("keyUuidCol", NativeTypes.UUID, false)},
-                new Column[]{
-                        new Column("added", INT8, true),
-                        new Column("valBooleanCol", BOOLEAN, true),
-                        new Column("valByteCol", INT8, true),
-                        new Column("valShortCol", INT16, true),
-                        new Column("valIntCol", INT32, true),
-                        new Column("valLongCol", INT64, true),
-                        new Column("valFloatCol", FLOAT, true),
-                        new Column("valDoubleCol", DOUBLE, true),
-                        new Column("valDateCol", DATE, true),
-                        new Column("valTimeCol", time(0), true),
-                        new Column("valDateTimeCol", datetime(6), true),
-                        new Column("valTimeStampCol", timestamp(6), true),
-                        new Column("valBitmask1Col", NativeTypes.bitmaskOf(22), true),
-                        new Column("valBytesCol", BYTES, false),
-                        new Column("valStringCol", STRING, false),
-                        new Column("valNumberCol", NativeTypes.numberOf(20), false),
-                        new Column("valDecimalCol", NativeTypes.decimalOf(25, 5), false),
-                }
-        );
+        int addedColumnIndex = newSchema.column(newColumn.name()).positionInRow();
 
-        int addedColumnIndex = schema2.column("added").positionInRow();
-
-        schema2.columnMapping(new ColumnMapper() {
+        newSchema.columnMapping(new ColumnMapper() {
             @Override
             public ColumnMapper add(Column col) {
-                return null;
+                return fail();
             }
 
             @Override
             public ColumnMapper add(int from, int to) {
-                return null;
+                return fail();
             }
 
             @Override
@@ -137,41 +171,20 @@
 
             @Override
             public Column mappedColumn(int idx) {
-                return idx == addedColumnIndex ? schema2.column(idx) : null;
+                return idx == addedColumnIndex ? newSchema.column(idx) : null;
             }
         });
 
-        List<Object> values = generateRowValues(schema);
-
-        BinaryRow row = serializeValuesToRow(schema, values);
-
-        var schemaRegistry = new SchemaRegistryImpl(
-                v -> v == 1 ? schema : schema2,
-                schema
-        );
-
-        // Validate row.
-        validateRow(values, schemaRegistry, row);
-
-        // Validate upgraded row.
-        values.add(addedColumnIndex, null);
-
-        var schema2Registry = new SchemaRegistryImpl(
-                v -> v == 1 ? schema : schema2,
-                schema2
-        );
-
-        validateRow(values, schema2Registry, row);
+        return newSchema;
     }
 
-    private void validateRow(List<Object> values, SchemaRegistryImpl schemaRegistry, BinaryRow binaryRow) {
-        Row row = schemaRegistry.resolve(binaryRow, schemaRegistry.lastKnownSchemaVersion());
-
+    private static void validateRow(List<Object> values, Row row) {
         SchemaDescriptor schema = row.schema();
 
         for (int i = 0; i < values.size(); i++) {
             Column col = schema.column(i);
 
+            assertThat("Failed for column: " + col, row.hasNullValue(col.positionInRow()), is(equalTo(values.get(i) == null)));
             assertThat("Failed for column: " + col, row.value(col.positionInRow()), is(equalTo(values.get(i))));
         }
     }
@@ -188,7 +201,11 @@
         for (int i = 0; i < schema.length(); i++) {
             NativeType type = schema.column(i).type();
 
-            res.add(SchemaTestUtils.generateRandomValue(rnd, type));
+            if (NULL_COLUMN_NAME.equals(schema.column(i).name())) {
+                res.add(null);
+            } else {
+                res.add(SchemaTestUtils.generateRandomValue(rnd, type));
+            }
         }
 
         return res;
@@ -198,11 +215,11 @@
      * Validates row values after serialization-then-deserialization.
      *
      * @param schema Row schema.
-     * @param vals   Row values.
+     * @param vals Row values.
      * @return Row bytes.
      */
     private static BinaryRow serializeValuesToRow(SchemaDescriptor schema, List<Object> vals) {
-        assertEquals(schema.keyColumns().size() + schema.valueColumns().size(), vals.size());
+        assertEquals(schema.columns().size(), vals.size());
 
         RowAssembler asm = new RowAssembler(schema, -1);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/MutableRowTupleAdapter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/MutableRowTupleAdapter.java
index 931ebe9..26a7537 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/MutableRowTupleAdapter.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/MutableRowTupleAdapter.java
@@ -27,7 +27,7 @@
 import java.util.Iterator;
 import java.util.UUID;
 import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
-import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.table.Tuple;
@@ -299,7 +299,7 @@
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryTuple binaryTuple() {
+    public @Nullable BinaryTupleReader binaryTuple() {
         return row == null ? null : row.binaryTuple();
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 711d12c..f623b06 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -432,7 +432,7 @@
             List<BinaryRowEx> rows = new ArrayList<>(recs.size());
 
             for (R rec : recs) {
-                Row row = marsh.marshal(Objects.requireNonNull(rec));
+                BinaryRowEx row = marsh.marshal(Objects.requireNonNull(rec));
 
                 rows.add(row);
             }
@@ -451,7 +451,7 @@
 
             for (R rec : recs) {
                 boolean isDeleted = deleted != null && deleted.get(rows.size());
-                Row row = isDeleted ? marsh.marshalKey(rec) : marsh.marshal(rec);
+                BinaryRowEx row = isDeleted ? marsh.marshalKey(rec) : marsh.marshal(rec);
 
                 rows.add(row);
             }
@@ -493,7 +493,7 @@
             List<BinaryRowEx> rows = new ArrayList<>(recs.size());
 
             for (R rec : recs) {
-                Row row = marsh.marshalKey(Objects.requireNonNull(rec));
+                BinaryRowEx row = marsh.marshalKey(Objects.requireNonNull(rec));
 
                 rows.add(row);
             }