blob: 054df56eb5fe31b32ed5213d3706c11a0a849821 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.command;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.lang.IgniteLogger;
/**
* This is an utility class for serialization cache tuples. It will be removed after another way for serialization is
* implemented into the network layer.
* TODO: Remove it after (IGNITE-14793)
*/
public class CommandUtils {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(CommandUtils.class);
/**
* Writes a list of rows to byte array.
*
* @param rows Collection of rows.
* @param consumer Byte array consumer.
*/
public static void rowsToBytes(Collection<BinaryRow> rows, Consumer<byte[]> consumer) {
if (rows == null || rows.isEmpty())
return;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
for (BinaryRow row : rows) {
rowToBytes(row, bytes -> {
try {
baos.write(intToBytes(bytes.length));
baos.write(bytes);
}
catch (IOException e) {
LOG.error("Could not write row to stream [row=" + row + ']', e);
}
});
}
baos.flush();
consumer.accept(baos.toByteArray());
}
catch (IOException e) {
LOG.error("Could not write rows to stream [rows=" + rows.size() + ']', e);
consumer.accept(null);
}
}
/**
* Writes a row to byte array.
*
* @param row Row.
* @param consumer Byte array consumer.
*/
public static void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
if (row == null)
return;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
row.writeTo(baos);
baos.flush();
consumer.accept(baos.toByteArray());
}
catch (IOException e) {
LOG.error("Could not write row to stream [row=" + row + ']', e);
consumer.accept(null);
}
}
/**
* Reads the keys from a byte array.
*
* @param bytes Byte array.
* @param consumer Consumer for binary row.
*/
public static void readRows(byte[] bytes, Consumer<BinaryRow> consumer) {
if (bytes == null || bytes.length == 0)
return;
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
byte[] lenBytes = new byte[4];
byte[] rowBytes;
int read;
while ((read = bais.read(lenBytes)) != -1) {
assert read == 4;
int len = bytesToInt(lenBytes);
assert len > 0;
rowBytes = new byte[len];
read = bais.read(rowBytes);
assert read == len;
consumer.accept(new ByteBufferRow(rowBytes));
}
}
catch (IOException e) {
LOG.error("Could not read rows from stream.", e);
}
}
/**
* Serializes an integer to the byte array.
*
* @param i Integer value.
* @return Byte array.
*/
private static byte[] intToBytes(int i) {
byte[] arr = new byte[4];
ByteBuffer.wrap(arr).putInt(i);
return arr;
}
/**
* Deserializes a byte array to the integer.
*
* @param bytes Byte array.
* @return Integer value.
*/
private static int bytesToInt(byte[] bytes) {
return ByteBuffer.wrap(bytes).getInt();
}
}