blob: 8a85c678df36e1d850127d2733b35a580e14d703 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fury;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Data;
import org.apache.fury.config.Language;
import org.apache.fury.logging.Logger;
import org.apache.fury.logging.LoggerFactory;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils;
import org.apache.fury.serializer.BufferObject;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.util.MurmurHash3;
import org.testng.Assert;
import org.testng.annotations.Test;
/** Tests in this class need fury python installed. */
@Test
public class CrossLanguageTest {
private static final Logger LOG = LoggerFactory.getLogger(CrossLanguageTest.class);
private static final String PYTHON_MODULE = "pyfury.tests.test_cross_language";
private static final String PYTHON_EXECUTABLE = "python";
/**
* Execute an external command.
*
* @return Whether the command succeeded.
*/
private boolean executeCommand(List<String> command, int waitTimeoutSeconds) {
return executeCommand(
command, waitTimeoutSeconds, ImmutableMap.of("ENABLE_CROSS_LANGUAGE_TESTS", "true"));
}
private boolean executeCommand(
List<String> command, int waitTimeoutSeconds, Map<String, String> env) {
try {
LOG.info("Executing command: {}", String.join(" ", command));
ProcessBuilder processBuilder =
new ProcessBuilder(command)
.redirectOutput(ProcessBuilder.Redirect.INHERIT)
.redirectError(ProcessBuilder.Redirect.INHERIT);
for (Map.Entry<String, String> entry : env.entrySet()) {
processBuilder.environment().put(entry.getKey(), entry.getValue());
}
Process process = processBuilder.start();
process.waitFor(waitTimeoutSeconds, TimeUnit.SECONDS);
return process.exitValue() == 0;
} catch (Exception e) {
throw new RuntimeException("Error executing command " + String.join(" ", command), e);
}
}
@Data
public static class A {
public Integer f1;
public Map<String, String> f2;
public static A create() {
A a = new A();
a.f1 = 1;
a.f2 = new HashMap<>();
a.f2.put("pid", "12345");
a.f2.put("ip", "0.0.0.0");
a.f2.put("k1", "v1");
return a;
}
}
@Test
public void testBuffer() throws IOException {
MemoryBuffer buffer = MemoryUtils.buffer(32);
buffer.writeBoolean(true);
buffer.writeByte(Byte.MAX_VALUE);
buffer.writeInt16(Short.MAX_VALUE);
buffer.writeInt32(Integer.MAX_VALUE);
buffer.writeInt64(Long.MAX_VALUE);
buffer.writeFloat32(-1.1f);
buffer.writeFloat64(-1.1);
buffer.writeVarUint32(100);
byte[] bytes = {'a', 'b'};
buffer.writeInt32(bytes.length);
buffer.writeBytes(bytes);
Path dataFile = Files.createTempFile("test_buffer", "data");
Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_buffer",
dataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
buffer = MemoryUtils.wrap(Files.readAllBytes(dataFile));
Assert.assertTrue(buffer.readBoolean());
Assert.assertEquals(buffer.readByte(), Byte.MAX_VALUE);
Assert.assertEquals(buffer.readInt16(), Short.MAX_VALUE);
Assert.assertEquals(buffer.readInt32(), Integer.MAX_VALUE);
Assert.assertEquals(buffer.readInt64(), Long.MAX_VALUE);
Assert.assertEquals(buffer.readFloat32(), -1.1f, 0.0001);
Assert.assertEquals(buffer.readFloat64(), -1.1, 0.0001);
Assert.assertEquals(buffer.readVarUint32(), 100);
Assert.assertTrue(Arrays.equals(buffer.readBytes(buffer.readInt32()), bytes));
}
@Test
public void testMurmurHash3() throws IOException {
MemoryBuffer buffer = MemoryUtils.buffer(32);
byte[] hash1 = Hashing.murmur3_128(47).hashBytes(new byte[] {1, 2, 8}).asBytes();
buffer.writeBytes(hash1);
byte[] hash2 =
Hashing.murmur3_128(47)
.hashBytes("01234567890123456789".getBytes(StandardCharsets.UTF_8))
.asBytes();
buffer.writeBytes(hash2);
Path dataFile = Files.createTempFile("test_murmurhash3", "data");
Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_murmurhash3",
dataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
long[] longs = MurmurHash3.murmurhash3_x64_128(new byte[] {1, 2, 8}, 0, 3, 47);
buffer.writerIndex(0);
buffer.writeInt64(longs[0]);
buffer.writeInt64(longs[1]);
Files.write(
dataFile, buffer.getBytes(0, buffer.writerIndex()), StandardOpenOption.TRUNCATE_EXISTING);
Assert.assertTrue(executeCommand(command, 30));
}
/** Keep this in sync with `foo_schema` in test_cross_language.py */
@Data
public static class Foo {
public Integer f1;
public String f2;
public List<String> f3;
public Map<String, Integer> f4;
public Bar f5;
public static Foo create() {
Foo foo = new Foo();
foo.f1 = 1;
foo.f2 = "str";
foo.f3 = Arrays.asList("str1", null, "str2");
foo.f4 =
new HashMap<String, Integer>() {
{
put("k1", 1);
put("k2", 2);
put("k3", 3);
put("k4", 4);
put("k5", 5);
put("k6", 6);
}
};
foo.f5 = Bar.create();
return foo;
}
}
/** Keep this in sync with `bar_schema` in test_cross_language.py */
@Data
public static class Bar {
public Integer f1;
public String f2;
public static Bar create() {
Bar bar = new Bar();
bar.f1 = 1;
bar.f2 = "str";
return bar;
}
}
@Test
public void testCrossLanguageSerializer() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
MemoryBuffer buffer = MemoryUtils.buffer(32);
fury.serialize(buffer, true);
fury.serialize(buffer, false);
fury.serialize(buffer, -1);
fury.serialize(buffer, Byte.MAX_VALUE);
fury.serialize(buffer, Byte.MIN_VALUE);
fury.serialize(buffer, Short.MAX_VALUE);
fury.serialize(buffer, Short.MIN_VALUE);
fury.serialize(buffer, Integer.MAX_VALUE);
fury.serialize(buffer, Integer.MIN_VALUE);
fury.serialize(buffer, Long.MAX_VALUE);
fury.serialize(buffer, Long.MIN_VALUE);
fury.serialize(buffer, -1.f);
fury.serialize(buffer, -1.d);
fury.serialize(buffer, "str");
LocalDate day = LocalDate.of(2021, 11, 23);
fury.serialize(buffer, day);
Instant instant = Instant.ofEpochSecond(100);
fury.serialize(buffer, instant);
List<Object> list = Arrays.asList("a", 1, -1.0, instant, day);
fury.serialize(buffer, list);
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < list.size(); i++) {
map.put("k" + i, list.get(i));
map.put(list.get(i), list.get(i));
}
fury.serialize(buffer, map);
Set<Object> set = new HashSet<>(list);
fury.serialize(buffer, set);
// test primitive arrays
fury.serialize(buffer, new boolean[] {true, false});
fury.serialize(buffer, new short[] {1, Short.MAX_VALUE});
fury.serialize(buffer, new int[] {1, Integer.MAX_VALUE});
fury.serialize(buffer, new long[] {1, Long.MAX_VALUE});
fury.serialize(buffer, new float[] {1.f, 2.f});
fury.serialize(buffer, new double[] {1.0, 2.0});
BiConsumer<MemoryBuffer, Boolean> function =
(MemoryBuffer buf, Boolean useToString) -> {
assertStringEquals(fury.deserialize(buf), true, useToString);
assertStringEquals(fury.deserialize(buf), false, useToString);
assertStringEquals(fury.deserialize(buf), -1, useToString);
assertStringEquals(fury.deserialize(buf), Byte.MAX_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Byte.MIN_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Short.MAX_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Short.MIN_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Integer.MAX_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Integer.MIN_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Long.MAX_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), Long.MIN_VALUE, useToString);
assertStringEquals(fury.deserialize(buf), -1.f, useToString);
assertStringEquals(fury.deserialize(buf), -1.d, useToString);
assertStringEquals(fury.deserialize(buf), "str", useToString);
assertStringEquals(fury.deserialize(buf), day, useToString);
assertStringEquals(fury.deserialize(buf), instant, useToString);
assertStringEquals(fury.deserialize(buf), list, useToString);
assertStringEquals(fury.deserialize(buf), map, useToString);
assertStringEquals(fury.deserialize(buf), set, useToString);
assertStringEquals(fury.deserialize(buf), new boolean[] {true, false}, false);
assertStringEquals(fury.deserialize(buf), new short[] {1, Short.MAX_VALUE}, false);
assertStringEquals(fury.deserialize(buf), new int[] {1, Integer.MAX_VALUE}, false);
assertStringEquals(fury.deserialize(buf), new long[] {1, Long.MAX_VALUE}, false);
assertStringEquals(fury.deserialize(buf), new float[] {1.f, 2.f}, false);
assertStringEquals(fury.deserialize(buf), new double[] {1.0, 2.0}, false);
};
function.accept(buffer, false);
Path dataFile = Files.createTempFile("test_cross_language_serializer", "data");
// Files.deleteIfExists(Paths.get("test_cross_language_serializer.data"));
// Path dataFile = Files.createFile(Paths.get("test_cross_language_serializer.data"));
Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_cross_language_serializer",
dataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
MemoryBuffer buffer2 = MemoryUtils.wrap(Files.readAllBytes(dataFile));
function.accept(buffer2, true);
}
@Test
public void testCrossLanguagePreserveTypes() {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
Assert.assertEquals(
new String[] {"str", "str"}, (Object[]) serDe(fury, new String[] {"str", "str"}));
Assert.assertEquals(new Object[] {"str", 1}, (Object[]) serDe(fury, new Object[] {"str", 1}));
Assert.assertTrue(
Arrays.deepEquals(
new Integer[][] {{1, 2}, {1, 2}},
(Integer[][]) serDe(fury, new Integer[][] {{1, 2}, {1, 2}})));
Assert.assertEquals(Arrays.asList(1, 2), serDe(fury, Arrays.asList(1, 2)));
List<String> arrayList = Arrays.asList("str", "str");
Assert.assertEquals(arrayList, serDe(fury, arrayList));
Assert.assertEquals(new LinkedList<>(arrayList), serDe(fury, new LinkedList<>(arrayList)));
Assert.assertEquals(new HashSet<>(arrayList), serDe(fury, new HashSet<>(arrayList)));
TreeSet<String> treeSet = new TreeSet<>(Comparator.naturalOrder());
treeSet.add("str1");
treeSet.add("str2");
Assert.assertEquals(treeSet, serDe(fury, treeSet));
HashMap<String, Integer> hashMap = new HashMap<>();
hashMap.put("k1", 1);
hashMap.put("k2", 2);
Assert.assertEquals(hashMap, serDe(fury, hashMap));
Assert.assertEquals(new LinkedHashMap<>(hashMap), serDe(fury, new LinkedHashMap<>(hashMap)));
Assert.assertEquals(Collections.EMPTY_LIST, serDe(fury, Collections.EMPTY_LIST));
Assert.assertEquals(Collections.EMPTY_SET, serDe(fury, Collections.EMPTY_SET));
Assert.assertEquals(Collections.EMPTY_MAP, serDe(fury, Collections.EMPTY_MAP));
Assert.assertEquals(
Collections.singletonList("str"), serDe(fury, Collections.singletonList("str")));
Assert.assertEquals(Collections.singleton("str"), serDe(fury, Collections.singleton("str")));
Assert.assertEquals(
Collections.singletonMap("k", 1), serDe(fury, Collections.singletonMap("k", 1)));
}
@SuppressWarnings("unchecked")
private void assertStringEquals(Object actual, Object expected, boolean useToString) {
if (useToString) {
if (expected instanceof Map) {
Map actualMap =
(Map)
((Map) actual)
.entrySet().stream()
.collect(
Collectors.toMap(
(Map.Entry e) -> e.getKey().toString(),
(Map.Entry e) -> e.getValue().toString()));
Map expectedMap =
(Map)
((Map) expected)
.entrySet().stream()
.collect(
Collectors.toMap(
(Map.Entry e) -> e.getKey().toString(),
(Map.Entry e) -> e.getValue().toString()));
Assert.assertEquals(actualMap, expectedMap);
} else if (expected instanceof Set) {
Object actualSet =
((Set) actual).stream().map(Object::toString).collect(Collectors.toSet());
Object expectedSet =
((Set) expected).stream().map(Object::toString).collect(Collectors.toSet());
Assert.assertEquals(actualSet, expectedSet);
} else {
Assert.assertEquals(actual.toString(), expected.toString());
}
} else {
Assert.assertEquals(actual, expected);
}
}
private Object serDe(Fury fury, Object obj) {
byte[] bytes = fury.serialize(obj);
return fury.deserialize(bytes);
}
@SuppressWarnings("unchecked")
@Test
public void testCrossLanguageReference() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
List<Object> list = new ArrayList<>();
Map<Object, Object> map = new HashMap<>();
list.add(list);
list.add(map);
map.put("k1", map);
map.put("k2", list);
MemoryBuffer buffer = MemoryUtils.buffer(32);
fury.serialize(buffer, list);
Consumer<MemoryBuffer> function =
(MemoryBuffer buf) -> {
List<Object> newList = (List<Object>) fury.deserialize(buf);
Assert.assertNotNull(newList);
Assert.assertSame(newList, newList.get(0));
Map<Object, Object> newMap = (Map<Object, Object>) newList.get(1);
Assert.assertSame(newMap.get("k1"), newMap);
Assert.assertSame(newMap.get("k2"), newList);
};
Path dataFile = Files.createTempFile("test_cross_language_reference", "data");
// Files.deleteIfExists(Paths.get("test_cross_language_reference.data"));
// Path dataFile = Files.createFile(Paths.get("test_cross_language_reference.data"));
Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_cross_language_reference",
dataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
MemoryBuffer buffer2 = MemoryUtils.wrap(Files.readAllBytes(dataFile));
function.accept(buffer2);
}
@Data
public static class ComplexObject1 {
Object f1;
String f2;
List<Object> f3;
Map<Byte, Integer> f4;
Byte f5;
Short f6;
Integer f7;
Long f8;
Float f9;
Double f10;
short[] f11;
List<Short> f12;
}
@Data
public static class ComplexObject2 {
Object f1;
Map<Byte, Integer> f2;
}
@Test
public void testSerializeSimpleStruct() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
fury.register(ComplexObject2.class, "test.ComplexObject2");
ComplexObject2 obj2 = new ComplexObject2();
obj2.f1 = true;
obj2.f2 = new HashMap<>(ImmutableMap.of((byte) -1, 2));
structRoundBack(fury, obj2, "test_serialize_simple_struct");
}
public void testSerializeComplexStruct() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
fury.register(ComplexObject1.class, "test.ComplexObject1");
fury.register(ComplexObject2.class, "test.ComplexObject2");
ComplexObject2 obj2 = new ComplexObject2();
obj2.f1 = true;
obj2.f2 = ImmutableMap.of((byte) -1, 2);
ComplexObject1 obj = new ComplexObject1();
obj.f1 = obj2;
obj.f2 = "abc";
obj.f3 = Arrays.asList("abc", "abc");
obj.f4 = ImmutableMap.of((byte) 1, 2);
obj.f5 = Byte.MAX_VALUE;
obj.f6 = Short.MAX_VALUE;
obj.f7 = Integer.MAX_VALUE;
obj.f8 = Long.MAX_VALUE;
obj.f9 = 1.0f / 2;
obj.f10 = 1 / 3.0;
obj.f11 = new short[] {(short) 1, (short) 2};
obj.f12 = ImmutableList.of((short) -1, (short) 4);
structRoundBack(fury, obj, "test_serialize_complex_struct");
}
private void structRoundBack(Fury fury, Object obj, String testName) throws IOException {
byte[] serialized = fury.serialize(obj);
Assert.assertEquals(fury.deserialize(serialized), obj);
Path dataFile = Paths.get(testName);
System.out.println(dataFile.toAbsolutePath());
Files.deleteIfExists(dataFile);
Files.write(dataFile, serialized);
dataFile.toFile().deleteOnExit();
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE, "-m", PYTHON_MODULE, testName, dataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
Assert.assertEquals(fury.deserialize(Files.readAllBytes(dataFile)), obj);
}
@Test
public void testSerializeOpaqueObjectSimple() {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
fury.register(ComplexObject2.class, "test.ComplexObject2");
ComplexObject2 obj = new ComplexObject2();
obj.f1 = Foo.create();
byte[] serialized = fury.serialize(obj);
Assert.assertEquals(fury.deserialize(serialized), obj);
}
@Test
public void testSerializeOpaqueObject() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
fury.register(ComplexObject1.class, "test.ComplexObject1");
// don't register ComplexObject2/Foo to make them serialize as opaque blobs.
ComplexObject1 obj = new ComplexObject1();
obj.f1 = new ComplexObject2();
((ComplexObject2) obj.f1).f1 = true;
((ComplexObject2) obj.f1).f2 = new HashMap<>(ImmutableMap.of((byte) -1, 2));
obj.f2 = "abc";
obj.f3 = Arrays.asList(obj.f1, Foo.create());
byte[] serialized = fury.serialize(obj);
Assert.assertEquals(fury.deserialize(serialized), obj);
Path dataFile = Files.createTempFile("test_serialize_opaque_object", "data");
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_serialize_opaque_object",
dataFile.toAbsolutePath().toString());
Files.write(dataFile, serialized);
Assert.assertTrue(executeCommand(command, 30));
// TODO support OpaqueObject itself serialization
// ComplexObject1 newObj = (ComplexObject1) fury.deserialize(Files.readAllBytes(dataFile));
// assertEquals(newObj.f1.getClass(), OpaqueObjects.OpaqueObject.class);
// assertEquals(newObj.f2, obj.f2);
// assertNotNull(newObj.f3);
// assertTrue(newObj.f3.get(0) instanceof OpaqueObjects.OpaqueObject);
// assertTrue(newObj.f3.get(1) instanceof OpaqueObjects.OpaqueObject);
}
private static class ComplexObject1Serializer extends Serializer<ComplexObject1> {
public ComplexObject1Serializer(Fury fury, Class<ComplexObject1> cls) {
super(fury, cls);
}
@Override
public void write(MemoryBuffer buffer, ComplexObject1 value) {
xwrite(buffer, value);
}
@Override
public ComplexObject1 read(MemoryBuffer buffer) {
return xread(buffer);
}
@Override
public short getXtypeId() {
return Fury.FURY_TYPE_TAG_ID;
}
@Override
public String getCrossLanguageTypeTag() {
return "test.ComplexObject1";
}
@Override
public void xwrite(MemoryBuffer buffer, ComplexObject1 value) {
fury.xwriteRef(buffer, value.f1);
fury.xwriteRef(buffer, value.f2);
fury.xwriteRef(buffer, value.f3);
}
@SuppressWarnings("unchecked")
@Override
public ComplexObject1 xread(MemoryBuffer buffer) {
ComplexObject1 obj = new ComplexObject1();
fury.getRefResolver().reference(obj);
obj.f1 = fury.xreadRef(buffer);
obj.f2 = (String) fury.xreadRef(buffer);
obj.f3 = (List<Object>) fury.xreadRef(buffer);
return obj;
}
}
@Test
public void testRegisterSerializer() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
fury.registerSerializer(ComplexObject1.class, ComplexObject1Serializer.class);
ComplexObject1 obj = new ComplexObject1();
obj.f1 = true;
obj.f2 = "abc";
obj.f3 = Arrays.asList("abc", "abc");
byte[] serialized = fury.serialize(obj);
Assert.assertEquals(fury.deserialize(serialized), obj);
Path dataFile = Files.createTempFile("test_register_serializer", "data");
Files.write(dataFile, serialized);
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_register_serializer",
dataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
Assert.assertEquals(fury.deserialize(Files.readAllBytes(dataFile)), obj);
}
@Test
public void testOutOfBandBuffer() throws Exception {
Fury fury =
Fury.builder()
.withLanguage(Language.XLANG)
.withRefTracking(true)
.requireClassRegistration(false)
.build();
AtomicInteger counter = new AtomicInteger(0);
List<byte[]> data =
IntStream.range(0, 10).mapToObj(i -> new byte[] {0, 1}).collect(Collectors.toList());
List<BufferObject> bufferObjects = new ArrayList<>();
byte[] serialized =
fury.serialize(
data,
o -> {
if (counter.incrementAndGet() % 2 == 0) {
bufferObjects.add(o);
return false;
} else {
return true;
}
});
List<MemoryBuffer> buffers =
bufferObjects.stream().map(BufferObject::toBuffer).collect(Collectors.toList());
List<byte[]> newObj = (List<byte[]>) fury.deserialize(serialized, buffers);
for (int i = 0; i < data.size(); i++) {
Assert.assertEquals(data.get(i), newObj.get(i));
}
Path intBandDataFile = Files.createTempFile("test_oob_buffer_in_band", "data");
Files.write(intBandDataFile, serialized);
Path outOfBandDataFile = Files.createTempFile("test_oob_buffer_out_of_band", "data");
Files.deleteIfExists(outOfBandDataFile);
Files.createFile(outOfBandDataFile);
MemoryBuffer outOfBandBuffer = MemoryBuffer.newHeapBuffer(32);
outOfBandBuffer.writeInt32(buffers.size());
for (int i = 0; i < buffers.size(); i++) {
outOfBandBuffer.writeInt32(bufferObjects.get(i).totalBytes());
bufferObjects.get(i).writeTo(outOfBandBuffer);
}
Files.write(outOfBandDataFile, outOfBandBuffer.getBytes(0, outOfBandBuffer.writerIndex()));
ImmutableList<String> command =
ImmutableList.of(
PYTHON_EXECUTABLE,
"-m",
PYTHON_MODULE,
"test_oob_buffer",
intBandDataFile.toAbsolutePath().toString(),
outOfBandDataFile.toAbsolutePath().toString());
Assert.assertTrue(executeCommand(command, 30));
MemoryBuffer inBandBuffer = MemoryUtils.wrap(Files.readAllBytes(intBandDataFile));
outOfBandBuffer = MemoryUtils.wrap(Files.readAllBytes(outOfBandDataFile));
int numBuffers = outOfBandBuffer.readInt32();
buffers = new ArrayList<>();
for (int i = 0; i < numBuffers; i++) {
int len = outOfBandBuffer.readInt32();
int readerIndex = outOfBandBuffer.readerIndex();
buffers.add(outOfBandBuffer.slice(readerIndex, len));
outOfBandBuffer.readerIndex(readerIndex + len);
}
newObj = (List<byte[]>) fury.deserialize(inBandBuffer, buffers);
Assert.assertNotNull(newObj);
for (int i = 0; i < data.size(); i++) {
Assert.assertEquals(data.get(i), newObj.get(i));
}
}
@Data
static class ArrayStruct {
ArrayField[] f1;
}
@Data
static class ArrayField {
public String a;
}
@Test
public void testStructArrayField() {
Fury fury = Fury.builder().withLanguage(Language.XLANG).requireClassRegistration(true).build();
fury.register(ArrayStruct.class, "example.bar");
fury.register(ArrayField.class, "example.foo");
ArrayField a = new ArrayField();
a.a = "123";
ArrayStruct struct = new ArrayStruct();
struct.f1 = new ArrayField[] {a};
Assert.assertEquals(serDe(fury, struct), struct);
}
}