blob: d041b080d69bf3259ca36b4632f8a71e9a7150c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.Footer;
import org.apache.beam.runners.dataflow.internal.IsmFormat.FooterCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShard;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShardCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefix;
import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefixCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.CoderPropertiesTest.NonDeterministicCoder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link IsmFormat}. */
@RunWith(JUnit4.class)
public class IsmFormatTest {
private static final Coder<String> NON_DETERMINISTIC_CODER = new NonDeterministicCoder();
@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Test
public void testUsingNonDeterministicShardKeyCoder() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("is expected to be deterministic");
IsmFormat.validateCoderIsCompatible(
IsmRecordCoder.of(
1, // number or shard key coders for value records
0, // number of shard key coders for metadata records
ImmutableList.<Coder<?>>of(NON_DETERMINISTIC_CODER, ByteArrayCoder.of()),
ByteArrayCoder.of()));
}
@Test
public void testUsingNonDeterministicNonShardKeyCoder() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("is expected to be deterministic");
IsmFormat.validateCoderIsCompatible(
IsmRecordCoder.of(
1, // number or shard key coders for value records
0, // number of shard key coders for metadata records
ImmutableList.<Coder<?>>of(ByteArrayCoder.of(), NON_DETERMINISTIC_CODER),
ByteArrayCoder.of()));
}
@Test
public void testKeyPrefixCoder() throws Exception {
KeyPrefix keyPrefixA = KeyPrefix.of(5, 7);
KeyPrefix keyPrefixB = KeyPrefix.of(5, 7);
CoderProperties.coderDecodeEncodeEqual(KeyPrefixCoder.of(), keyPrefixA);
CoderProperties.coderDeterministic(KeyPrefixCoder.of(), keyPrefixA, keyPrefixB);
CoderProperties.coderConsistentWithEquals(KeyPrefixCoder.of(), keyPrefixA, keyPrefixB);
CoderProperties.coderSerializable(KeyPrefixCoder.of());
CoderProperties.structuralValueConsistentWithEquals(
KeyPrefixCoder.of(), keyPrefixA, keyPrefixB);
assertTrue(KeyPrefixCoder.of().isRegisterByteSizeObserverCheap(keyPrefixA));
assertEquals(2, KeyPrefixCoder.of().getEncodedElementByteSize(keyPrefixA));
}
@Test
public void testFooterCoder() throws Exception {
Footer footerA = Footer.of(1, 2, 3);
Footer footerB = Footer.of(1, 2, 3);
CoderProperties.coderDecodeEncodeEqual(FooterCoder.of(), footerA);
CoderProperties.coderDeterministic(FooterCoder.of(), footerA, footerB);
CoderProperties.coderConsistentWithEquals(FooterCoder.of(), footerA, footerB);
CoderProperties.coderSerializable(FooterCoder.of());
CoderProperties.structuralValueConsistentWithEquals(FooterCoder.of(), footerA, footerB);
assertTrue(FooterCoder.of().isRegisterByteSizeObserverCheap(footerA));
assertEquals(25, FooterCoder.of().getEncodedElementByteSize(footerA));
}
@Test
public void testNormalIsmRecordWithMetadataKeyIsError() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected key components to not contain metadata key");
IsmRecord.of(ImmutableList.of(IsmFormat.getMetadataKey()), "test");
}
@Test
public void testMetadataIsmRecordWithoutMetadataKeyIsError() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected key components to contain metadata key");
IsmRecord.meta(ImmutableList.of("non-metadata key"), "test".getBytes(StandardCharsets.UTF_8));
}
@Test
public void testIsmRecordCoder() throws Exception {
IsmRecord<String> ismRecordA = IsmRecord.of(ImmutableList.of("0"), "1");
IsmRecord<String> ismRecordB = IsmRecord.of(ImmutableList.of("0"), "1");
IsmRecord<String> ismMetaRecordA =
IsmRecord.meta(
ImmutableList.of(IsmFormat.getMetadataKey()), "2".getBytes(StandardCharsets.UTF_8));
IsmRecord<String> ismMetaRecordB =
IsmRecord.meta(
ImmutableList.of(IsmFormat.getMetadataKey()), "2".getBytes(StandardCharsets.UTF_8));
IsmRecordCoder<String> coder =
IsmRecordCoder.of(
1, 0, ImmutableList.<Coder<?>>of(StringUtf8Coder.of()), StringUtf8Coder.of());
IsmRecordCoder<String> coderWithMetadata =
IsmRecordCoder.of(
1,
1,
ImmutableList.<Coder<?>>of(MetadataKeyCoder.of(StringUtf8Coder.of())),
StringUtf8Coder.of());
// Non-metadata records against coder without metadata key support
CoderProperties.coderDecodeEncodeEqual(coder, ismRecordA);
CoderProperties.coderDeterministic(coder, ismRecordA, ismRecordB);
CoderProperties.coderConsistentWithEquals(coder, ismRecordA, ismRecordB);
CoderProperties.coderSerializable(coder);
CoderProperties.structuralValueConsistentWithEquals(coder, ismRecordA, ismRecordB);
// Non-metadata records against coder with metadata key support
CoderProperties.coderDecodeEncodeEqual(coderWithMetadata, ismRecordA);
CoderProperties.coderDeterministic(coderWithMetadata, ismRecordA, ismRecordB);
CoderProperties.coderConsistentWithEquals(coderWithMetadata, ismRecordA, ismRecordB);
CoderProperties.coderSerializable(coderWithMetadata);
CoderProperties.structuralValueConsistentWithEquals(coderWithMetadata, ismRecordA, ismRecordB);
// Metadata records
CoderProperties.coderDecodeEncodeEqual(coderWithMetadata, ismMetaRecordA);
CoderProperties.coderDeterministic(coderWithMetadata, ismMetaRecordA, ismMetaRecordB);
CoderProperties.coderConsistentWithEquals(coderWithMetadata, ismMetaRecordA, ismMetaRecordB);
CoderProperties.coderSerializable(coderWithMetadata);
CoderProperties.structuralValueConsistentWithEquals(
coderWithMetadata, ismMetaRecordA, ismMetaRecordB);
}
@Test
public void testIsmRecordCoderHashWithinExpectedRanges() throws Exception {
IsmRecordCoder<String> coder =
IsmRecordCoder.of(
2,
0,
ImmutableList.<Coder<?>>of(StringUtf8Coder.of(), StringUtf8Coder.of()),
StringUtf8Coder.of());
IsmRecordCoder<String> coderWithMetadata =
IsmRecordCoder.of(
2,
2,
ImmutableList.<Coder<?>>of(
MetadataKeyCoder.of(StringUtf8Coder.of()), StringUtf8Coder.of()),
StringUtf8Coder.of());
assertTrue(coder.hash(ImmutableList.of("A", "B")) < IsmFormat.SHARD_BITS + 1);
int hash = coderWithMetadata.hash(ImmutableList.of(IsmFormat.getMetadataKey(), "B"));
assertTrue(hash > IsmFormat.SHARD_BITS && hash < (IsmFormat.SHARD_BITS + 1) * 2);
}
@Test
public void testIsmRecordCoderWithTooManyKeysIsError() throws Exception {
IsmRecordCoder<String> coder =
IsmRecordCoder.of(
2,
0,
ImmutableList.<Coder<?>>of(StringUtf8Coder.of(), StringUtf8Coder.of()),
StringUtf8Coder.of());
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected at most");
coder.hash(ImmutableList.of("A", "B", "C"));
}
@Test
public void testIsmRecordCoderHashWithoutEnoughKeysIsError() throws Exception {
IsmRecordCoder<String> coder =
IsmRecordCoder.of(
2,
0,
ImmutableList.<Coder<?>>of(StringUtf8Coder.of(), StringUtf8Coder.of()),
StringUtf8Coder.of());
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected at least");
coder.hash(ImmutableList.of("A"));
}
@Test
public void testIsmRecordCoderMetadataHashWithoutEnoughKeysIsError() throws Exception {
IsmRecordCoder<String> coderWithMetadata =
IsmRecordCoder.of(
2,
2,
ImmutableList.<Coder<?>>of(
MetadataKeyCoder.of(StringUtf8Coder.of()), StringUtf8Coder.of()),
StringUtf8Coder.of());
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected at least");
coderWithMetadata.hash(ImmutableList.of(IsmFormat.getMetadataKey()));
}
@Test
public void testIsmRecordCoderKeyCoderCountMismatch() throws Exception {
IsmRecord<String> ismRecord = IsmRecord.of(ImmutableList.of("0", "too many"), "1");
IsmRecordCoder<String> coder =
IsmRecordCoder.of(
1, 0, ImmutableList.<Coder<?>>of(StringUtf8Coder.of()), StringUtf8Coder.of());
expectedException.expect(CoderException.class);
expectedException.expectMessage("Expected 1 key component(s) but received key");
coder.encode(ismRecord, new ByteArrayOutputStream());
}
@Test
public void testIsmRecordToStringEqualsAndHashCode() {
IsmRecord<String> ismRecordA = IsmRecord.of(ImmutableList.of("0"), "1");
IsmRecord<String> ismRecordB = IsmRecord.of(ImmutableList.of("0"), "1");
IsmRecord<String> ismRecordC = IsmRecord.of(ImmutableList.of("3"), "4");
IsmRecord<String> ismRecordAWithMeta =
IsmRecord.meta(
ImmutableList.of(IsmFormat.getMetadataKey(), "0"),
"2".getBytes(StandardCharsets.UTF_8));
IsmRecord<String> ismRecordBWithMeta =
IsmRecord.meta(
ImmutableList.of(IsmFormat.getMetadataKey(), "0"),
"2".getBytes(StandardCharsets.UTF_8));
IsmRecord<String> ismRecordCWithMeta =
IsmRecord.meta(
ImmutableList.of(IsmFormat.getMetadataKey(), "0"),
"5".getBytes(StandardCharsets.UTF_8));
assertEquals(ismRecordA, ismRecordB);
assertEquals(ismRecordAWithMeta, ismRecordBWithMeta);
assertNotEquals(ismRecordA, ismRecordAWithMeta);
assertNotEquals(ismRecordA, ismRecordC);
assertNotEquals(ismRecordAWithMeta, ismRecordCWithMeta);
assertEquals(ismRecordA.hashCode(), ismRecordB.hashCode());
assertEquals(ismRecordAWithMeta.hashCode(), ismRecordBWithMeta.hashCode());
assertNotEquals(ismRecordA.hashCode(), ismRecordAWithMeta.hashCode());
assertNotEquals(ismRecordA.hashCode(), ismRecordC.hashCode());
assertNotEquals(ismRecordAWithMeta.hashCode(), ismRecordCWithMeta.hashCode());
assertThat(
ismRecordA.toString(),
allOf(containsString("keyComponents=[0]"), containsString("value=1")));
assertThat(
ismRecordAWithMeta.toString(),
allOf(containsString("keyComponents=[META, 0]"), containsString("metadata=")));
}
@Test
public void testIsmShardCoder() throws Exception {
IsmShard shardA = IsmShard.of(1, 2, 3);
IsmShard shardB = IsmShard.of(1, 2, 3);
CoderProperties.coderDecodeEncodeEqual(IsmShardCoder.of(), shardA);
CoderProperties.coderDeterministic(IsmShardCoder.of(), shardA, shardB);
CoderProperties.coderConsistentWithEquals(IsmShardCoder.of(), shardA, shardB);
CoderProperties.coderSerializable(IsmShardCoder.of());
CoderProperties.structuralValueConsistentWithEquals(IsmShardCoder.of(), shardA, shardB);
}
@Test
public void testIsmShardToStringEqualsAndHashCode() {
IsmShard shardA = IsmShard.of(1, 2, 3);
IsmShard shardB = IsmShard.of(1, 2, 3);
IsmShard shardC = IsmShard.of(4, 5, 6);
assertEquals(shardA, shardB);
assertNotEquals(shardA, shardC);
assertEquals(shardA.hashCode(), shardB.hashCode());
assertNotEquals(shardA.hashCode(), shardC.hashCode());
assertThat(
shardA.toString(),
allOf(
containsString("id=1"),
containsString("blockOffset=2"),
containsString("indexOffset=3")));
}
@Test
public void testUnknownVersion() throws Exception {
byte[] data = new byte[25];
data[24] = 5; // unknown version
ByteArrayInputStream bais = new ByteArrayInputStream(data);
expectedException.expect(IOException.class);
expectedException.expectMessage("Unknown version 5");
FooterCoder.of().decode(bais, Context.OUTER);
}
}