blob: 4ecdbc843882794938bd85e396b301b77e29ab3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput;
import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutput;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
/**
*
*/
public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
private static final int BUFF_SIZE = 4 * 1024;
/**
* @throws IOException If failed.
*/
@Test
public void testStreams() throws IOException {
GridUnsafeMemory mem = new GridUnsafeMemory(0);
HadoopDataOutStream out = new HadoopDataOutStream(mem);
final long ptr = mem.allocate(BUFF_SIZE);
out.buffer().set(ptr, BUFF_SIZE);
write(out);
HadoopDataInStream in = new HadoopDataInStream(mem);
in.buffer().set(ptr, out.buffer().pointer() - ptr);
checkRead(in);
}
/**
* @throws IOException If failed.
*/
@Test
public void testDirectStreams() throws IOException {
HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE);
write(out);
byte[] inBuf = Arrays.copyOf(out.buffer(), out.position());
HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf);
checkRead(in);
}
/**
* @throws IOException If failed.
*/
@Test
public void testReadline() throws IOException {
checkReadLine("String1\rString2\r\nString3\nString4");
checkReadLine("String1\rString2\r\nString3\nString4\r\n");
checkReadLine("String1\rString2\r\nString3\nString4\r");
checkReadLine("\nA\rB\r\nC\nD\n");
checkReadLine("\rA\rB\r\nC\nD\n");
checkReadLine("\r\nA\rB\r\nC\nD\n");
checkReadLine("\r\r\nA\r\r\nC\nD\n");
checkReadLine("\r\r\r\n\n\n");
checkReadLine("\r\n");
checkReadLine("\r");
checkReadLine("\n");
}
/**
* @param val String value.
* @throws IOException On error.
*/
private void checkReadLine(String val) throws IOException {
List<String> expected = readLineByDataInputStream(val);
List<String> dataInp = readLineByHadoopDataInStream(val);
List<String> directDataInp = readLineByHadoopDirectDataInput(val);
assertEquals(expected, dataInp);
assertEquals(expected, directDataInp);
}
/**
* @param val String value.
* @return List of strings are returned by readLine().
* @throws IOException On error.
*/
List<String> readLineByDataInputStream(String val) throws IOException {
ByteArrayOutputStream byteArrayOs = new ByteArrayOutputStream();
byteArrayOs.write(val.getBytes());
byteArrayOs.close();
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(byteArrayOs.toByteArray()))) {
return readLineStrings(in);
}
}
/**
* @param val String value.
* @return List of strings are returned by readLine().
* @throws IOException On error.
*/
List<String> readLineByHadoopDataInStream(String val) throws IOException {
GridUnsafeMemory mem = new GridUnsafeMemory(0);
HadoopDataOutStream out = new HadoopDataOutStream(mem);
final long ptr = mem.allocate(BUFF_SIZE);
out.buffer().set(ptr, BUFF_SIZE);
out.write(val.getBytes());
HadoopDataInStream in = new HadoopDataInStream(mem);
in.buffer().set(ptr, out.buffer().pointer() - ptr);
return readLineStrings(in);
}
/**
* @param val String value.
* @return List of strings are returned by readLine().
* @throws IOException On error.
*/
List<String> readLineByHadoopDirectDataInput(String val) throws IOException {
HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE);
out.write(val.getBytes());
byte[] inBuf = Arrays.copyOf(out.buffer(), out.position());
HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf);
return readLineStrings(in);
}
/**
* @param in Data input.
* @return List of strings are returned by readLine().
* @throws IOException On error.
*/
@NotNull private List<String> readLineStrings(DataInput in) throws IOException {
List<String> strs = new ArrayList<>();
for (String str = in.readLine(); str != null; str = in.readLine())
strs.add(str);
return strs;
}
/**
* @param out Data output.
* @throws IOException On error.
*/
private void write(DataOutput out) throws IOException {
out.writeBoolean(false);
out.writeBoolean(true);
out.writeBoolean(false);
out.write(17);
out.write(121);
out.write(0xfafa);
out.writeByte(17);
out.writeByte(121);
out.writeByte(0xfafa);
out.writeChar('z');
out.writeChar('o');
out.writeChar('r');
out.writeShort(100);
out.writeShort(Short.MIN_VALUE);
out.writeShort(Short.MAX_VALUE);
out.writeShort(65535);
out.writeShort(65536); // 0
out.writeInt(Integer.MAX_VALUE);
out.writeInt(Integer.MIN_VALUE);
out.writeInt(-1);
out.writeInt(0);
out.writeInt(1);
out.writeFloat(0.33f);
out.writeFloat(0.5f);
out.writeFloat(-0.7f);
out.writeFloat(Float.MAX_VALUE);
out.writeFloat(Float.MIN_VALUE);
out.writeFloat(Float.MIN_NORMAL);
out.writeFloat(Float.POSITIVE_INFINITY);
out.writeFloat(Float.NEGATIVE_INFINITY);
out.writeFloat(Float.NaN);
out.writeDouble(-12312312.3333333336666779);
out.writeDouble(123123.234);
out.writeDouble(Double.MAX_VALUE);
out.writeDouble(Double.MIN_VALUE);
out.writeDouble(Double.MIN_NORMAL);
out.writeDouble(Double.NEGATIVE_INFINITY);
out.writeDouble(Double.POSITIVE_INFINITY);
out.writeDouble(Double.NaN);
out.writeLong(Long.MAX_VALUE);
out.writeLong(Long.MIN_VALUE);
out.writeLong(0);
out.writeLong(-1L);
out.write(new byte[] {1, 2, 3});
out.write(new byte[] {0, 1, 2, 3}, 1, 2);
out.writeUTF("mom washes rum");
}
/**
* @param in Data input.
* @throws IOException On error.
*/
private void checkRead(DataInput in) throws IOException {
assertEquals(false, in.readBoolean());
assertEquals(true, in.readBoolean());
assertEquals(false, in.readBoolean());
assertEquals(17, in.readUnsignedByte());
assertEquals(121, in.readUnsignedByte());
assertEquals(0xfa, in.readUnsignedByte());
assertEquals(17, in.readByte());
assertEquals(121, in.readByte());
assertEquals((byte)0xfa, in.readByte());
assertEquals('z', in.readChar());
assertEquals('o', in.readChar());
assertEquals('r', in.readChar());
assertEquals(100, in.readShort());
assertEquals(Short.MIN_VALUE, in.readShort());
assertEquals(Short.MAX_VALUE, in.readShort());
assertEquals(-1, in.readShort());
assertEquals(0, in.readShort());
assertEquals(Integer.MAX_VALUE, in.readInt());
assertEquals(Integer.MIN_VALUE, in.readInt());
assertEquals(-1, in.readInt());
assertEquals(0, in.readInt());
assertEquals(1, in.readInt());
assertEquals(0.33f, in.readFloat());
assertEquals(0.5f, in.readFloat());
assertEquals(-0.7f, in.readFloat());
assertEquals(Float.MAX_VALUE, in.readFloat());
assertEquals(Float.MIN_VALUE, in.readFloat());
assertEquals(Float.MIN_NORMAL, in.readFloat());
assertEquals(Float.POSITIVE_INFINITY, in.readFloat());
assertEquals(Float.NEGATIVE_INFINITY, in.readFloat());
assertEquals(Float.NaN, in.readFloat());
assertEquals(-12312312.3333333336666779, in.readDouble());
assertEquals(123123.234, in.readDouble());
assertEquals(Double.MAX_VALUE, in.readDouble());
assertEquals(Double.MIN_VALUE, in.readDouble());
assertEquals(Double.MIN_NORMAL, in.readDouble());
assertEquals(Double.NEGATIVE_INFINITY, in.readDouble());
assertEquals(Double.POSITIVE_INFINITY, in.readDouble());
assertEquals(Double.NaN, in.readDouble());
assertEquals(Long.MAX_VALUE, in.readLong());
assertEquals(Long.MIN_VALUE, in.readLong());
assertEquals(0, in.readLong());
assertEquals(-1, in.readLong());
byte[] b = new byte[3];
in.readFully(b);
assertTrue(Arrays.equals(new byte[] {1, 2, 3}, b));
b = new byte[4];
in.readFully(b, 1, 2);
assertTrue(Arrays.equals(new byte[] {0, 1, 2, 0}, b));
assertEquals("mom washes rum", in.readUTF());
}
}