blob: 4921629a94da7c93d5d121562ca2c25e3c40ad7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.segment.standby.codec;
import static com.google.common.collect.Iterables.elementsEqual;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.createBlobChunkBuffer;
import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.createMask;
import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.hash;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.InputStream;
import java.util.UUID;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class ResponseDecoderTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
@Test
public void unrecognizedMessagesShouldBeDropped() throws Exception {
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
buf.writeByte(-1);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
assertNull(channel.readInbound());
}
@Test
public void shouldDecodeValidOneChunkGetBlobResponses() throws Exception {
byte[] blobData = new byte[] {1, 2, 3};
String blobId = "blobId";
byte mask = createMask(1, 1);
ByteBuf buf = createBlobChunkBuffer(Messages.HEADER_BLOB, 3L, blobId, blobData, mask);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
GetBlobResponse response = (GetBlobResponse) channel.readInbound();
assertEquals("blobId", response.getBlobId());
assertEquals(blobData.length, response.getLength());
try (InputStream is = response.getInputStream()) {
byte[] receivedData = IOUtils.toByteArray(is);
assertArrayEquals(blobData, receivedData);
}
}
@Test
public void shouldDecodeValidTwoChunksGetBlobResponses() throws Exception {
byte[] blobData = new byte[] {1, 2, 3, 4};
byte[] firstChunkData = new byte[] {1, 2};
byte[] secondChunkbData = new byte[] {3, 4};
String blobId = "blobId";
byte firstMask = createMask(1, 2);
ByteBuf firstBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, 4L, blobId, firstChunkData, firstMask);
byte secondMask = createMask(2, 2);
ByteBuf secondBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, 4L, blobId, secondChunkbData, secondMask);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(firstBuf);
channel.writeInbound(secondBuf);
GetBlobResponse response = (GetBlobResponse) channel.readInbound();
assertEquals("blobId", response.getBlobId());
assertEquals(blobData.length, response.getLength());
try (InputStream is = response.getInputStream()) {
byte[] receivedData = IOUtils.toByteArray(is);
assertArrayEquals(blobData, receivedData);
}
}
@Test
public void shouldDropInvalidGetBlobResponses() throws Exception {
byte[] blobData = new byte[] {1, 2, 3};
String blobId = "blobId";
byte[] blobIdBytes = blobId.getBytes(Charsets.UTF_8);
byte mask = createMask(1, 1);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1 + 1 + 8 + 4 + blobIdBytes.length + 8 + blobData.length);
buf.writeByte(Messages.HEADER_BLOB);
buf.writeByte(mask);
buf.writeLong(3L);
buf.writeInt(blobIdBytes.length);
buf.writeBytes(blobIdBytes);
buf.writeLong(hash(mask, 3L, blobData) + 1);
buf.writeBytes(blobData);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
assertNull(channel.readInbound());
}
@Test
public void shouldDecodeValidGetHeadResponses() throws Exception {
String recordId = "recordId";
byte[] recordIdBytes = recordId.getBytes(Charsets.UTF_8);
ByteBuf in = Unpooled.buffer();
in.writeInt(recordIdBytes.length + 1);
in.writeByte(Messages.HEADER_RECORD);
in.writeBytes(recordIdBytes);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(in);
GetHeadResponse response = (GetHeadResponse) channel.readInbound();
assertEquals(recordId, response.getHeadRecordId());
}
@Test
public void shouldDecodeValidGetSegmentResponses() throws Exception {
UUID uuid = new UUID(1, 2);
byte[] data = new byte[] {3, 4, 5};
ByteBuf buf = Unpooled.buffer();
buf.writeInt(data.length + 25);
buf.writeByte(Messages.HEADER_SEGMENT);
buf.writeLong(uuid.getMostSignificantBits());
buf.writeLong(uuid.getLeastSignificantBits());
buf.writeLong(hash(data));
buf.writeBytes(data);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
GetSegmentResponse response = (GetSegmentResponse) channel.readInbound();
assertEquals(uuid, UUID.fromString(response.getSegmentId()));
assertArrayEquals(data, response.getSegmentData());
}
@Test
public void shouldDecodeValidGetReferencesResponses() throws Exception {
byte[] data = "a:b,c".getBytes(Charsets.UTF_8);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(data.length + 1);
buf.writeByte(Messages.HEADER_REFERENCES);
buf.writeBytes(data);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
assertEquals("a", response.getSegmentId());
assertTrue(elementsEqual(asList("b", "c"), response.getReferences()));
}
@Test
public void shouldDropGetReferencesResponsesWithoutDelimiter() throws Exception {
byte[] data = "a".getBytes(Charsets.UTF_8);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(data.length + 1);
buf.writeByte(Messages.HEADER_REFERENCES);
buf.writeBytes(data);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
assertNull(channel.readInbound());
}
@Test
public void shouldDecodeValidSingleElementGetReferencesResponses() throws Exception {
byte[] data = "a:b".getBytes(Charsets.UTF_8);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(data.length + 1);
buf.writeByte(Messages.HEADER_REFERENCES);
buf.writeBytes(data);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
assertEquals("a", response.getSegmentId());
assertTrue(elementsEqual(newArrayList("b"), response.getReferences()));
}
@Test
public void shouldDecodeValidZeroElementsGetReferencesResponses() throws Exception {
byte[] data = "a:".getBytes(Charsets.UTF_8);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(data.length + 1);
buf.writeByte(Messages.HEADER_REFERENCES);
buf.writeBytes(data);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
assertEquals("a", response.getSegmentId());
assertTrue(elementsEqual(emptyList(), response.getReferences()));
}
@Test
public void shouldDropInvalidGetSegmentResponses() throws Exception {
UUID uuid = new UUID(1, 2);
byte[] data = new byte[] {3, 4, 5};
ByteBuf buf = Unpooled.buffer();
buf.writeInt(data.length + 25);
buf.writeByte(Messages.HEADER_SEGMENT);
buf.writeLong(uuid.getMostSignificantBits());
buf.writeLong(uuid.getLeastSignificantBits());
buf.writeLong(hash(data) + 1);
buf.writeBytes(data);
EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder(folder.newFolder()));
channel.writeInbound(buf);
assertNull(channel.readInbound());
}
}