blob: cecab95cbc437234751ae37119f5452c11d7332c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.schema.registry;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
import static org.apache.ignite.internal.schema.mapping.ColumnMapping.createMapper;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.type.NativeTypes.BYTES;
import static org.apache.ignite.internal.type.NativeTypes.INT64;
import static org.apache.ignite.internal.type.NativeTypes.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Schema manager test.
*/
public class SchemaRegistryImplTest {
private final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valBytesCol", BYTES, true)});
private final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{
new Column("valBytesCol", BYTES, true),
new Column("valStringCol", STRING, true)
});
@BeforeEach
void setupSchemas() {
Column column = schemaV2.column("valStringCol");
assertNotNull(column);
schemaV2.columnMapping(createMapper(schemaV2).add(column));
}
/**
* Check registration of schema with wrong versions.
*/
@Test
public void testWrongSchemaVersionRegistration() {
final SchemaDescriptor schemaWithZeroVersion = new SchemaDescriptor(0,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valBytesCol", BYTES, true)});
final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
assertNotNull(reg.lastKnownSchema());
// Try to register schema with initial version.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
assertNotNull(reg.schema(INITIAL_TABLE_VERSION));
// Try to register schema with version of 0-zero.
assertThrows(SchemaRegistryException.class, () -> reg.onSchemaRegistered(schemaWithZeroVersion));
assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
assertNotNull(reg.schema(0));
// Try to register schema with version of 2.
reg.onSchemaRegistered(schemaV2);
assertEquals(schemaV2.version(), reg.lastKnownSchemaVersion());
assertNotNull(reg.schema(INITIAL_TABLE_VERSION));
assertNotNull(reg.schema(0));
assertNotNull(reg.schema(1));
assertNotNull(reg.schema(2));
}
/**
* Check initial schema registration.
*/
@Test
public void testSchemaRegistration() {
final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{
new Column("valBytesCol", BYTES, true),
new Column("valStringCol", STRING, true)
});
final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
assertNotNull(reg.lastKnownSchema());
// Register schema with first version again.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
assertEquals(1, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV1, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
// Register schema with next version.
reg.onSchemaRegistered(schemaV2);
assertEquals(2, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV2, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
// Try to register schema with version of 4.
assertThrows(SchemaRegistryException.class, () -> reg.onSchemaRegistered(schemaV4));
assertEquals(2, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV2, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
try {
assertThat(supplyAsync(() -> reg.schema(3)), not(completedFuture()));
assertThat(supplyAsync(() -> reg.schema(4)), not(completedFuture()));
} finally {
reg.close();
}
}
/**
* Check schema registration.
*/
@Test
public void testDuplicateSchemaRegistration() {
final SchemaDescriptor wrongSchema = new SchemaDescriptor(1,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{
new Column("valBytesCol", BYTES, true),
new Column("valStringCol", STRING, true)
});
final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, schemaV1);
assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
// Register schema with very first version.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
assertEquals(1, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV1, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
// Try to register same schema once again.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
assertEquals(1, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV1, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
// Try to register another schema with same version and check nothing was registered.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(wrongSchema));
assertEquals(1, reg.lastKnownSchemaVersion());
assertEquals(1, reg.lastKnownSchema().version());
assertSameSchema(schemaV1, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
// Register schema with next version.
reg.onSchemaRegistered(schemaV2);
assertEquals(2, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV2, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
}
/**
* Check schema registration with full history.
*/
@Test
public void testInitialSchemaWithFullHistory() {
final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{
new Column("valStringCol", STRING, true)
});
final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{
new Column("valBytesCol", BYTES, true),
new Column("valStringCol", STRING, true)
});
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV2);
assertEquals(2, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV2, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
// Register schema with duplicate version.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
assertEquals(2, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV2, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
// Register schema with out-of-order version.
assertThrows(SchemaRegistryException.class, () -> reg.onSchemaRegistered(schemaV4));
assertEquals(2, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV2, reg.lastKnownSchema());
// Register schema with next version.
reg.onSchemaRegistered(schemaV3);
assertEquals(3, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV3, reg.lastKnownSchema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
assertSameSchema(schemaV3, reg.schema(3));
}
/**
* Check schema registration with history tail.
*/
@Test
public void testInitialSchemaWithTailHistory() {
final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valStringCol", STRING, true)});
final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{
new Column("valBytesCol", BYTES, true),
new Column("valStringCol", STRING, true)
});
final SchemaDescriptor schemaV5 = new SchemaDescriptor(5,
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valStringCol", STRING, true)});
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3);
final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV3);
assertEquals(3, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV3, reg.lastKnownSchema());
assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
assertSameSchema(schemaV3, reg.schema(3));
// Register schema with duplicate version.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV2));
assertEquals(3, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV3, reg.lastKnownSchema());
// Register schema with out-of-order version.
assertThrows(SchemaRegistryException.class, () -> reg.onSchemaRegistered(schemaV5));
assertEquals(3, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV3, reg.lastKnownSchema());
// Register schema with outdated version.
assertThrows(SchemaRegistrationConflictException.class, () -> reg.onSchemaRegistered(schemaV1));
assertEquals(3, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV3, reg.lastKnownSchema());
assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
assertSameSchema(schemaV3, reg.schema(3));
// Register schema with next version.
reg.onSchemaRegistered(schemaV4);
assertEquals(4, reg.lastKnownSchemaVersion());
assertSameSchema(schemaV4, reg.lastKnownSchema());
assertSameSchema(schemaV2, reg.schema(2));
assertSameSchema(schemaV3, reg.schema(3));
assertSameSchema(schemaV4, reg.schema(4));
}
@Test
void schemaAsyncReturnsExpectedResults() {
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1);
SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, schemaV1);
assertThat(reg.schemaAsync(0), willThrow(SchemaRegistryException.class));
assertThat(reg.schemaAsync(1), willBe(schemaV1));
CompletableFuture<SchemaDescriptor> schema2Future = reg.schemaAsync(2);
assertThat(schema2Future, willTimeoutIn(100, TimeUnit.MILLISECONDS));
reg.onSchemaRegistered(schemaV2);
assertThat(schema2Future, willBe(schemaV2));
}
/**
* SchemaHistory.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
* @param history Table schema history.
* @return Schema history map.
*/
private Map<Integer, SchemaDescriptor> schemaHistory(SchemaDescriptor... history) {
return Arrays.stream(history).collect(toMap(SchemaDescriptor::version, Function.identity()));
}
/**
* Validate schemas are equals.
*
* @param schemaDesc1 Schema descriptor to compare with.
* @param schemaDesc2 Schema descriptor to compare.
*/
private void assertSameSchema(SchemaDescriptor schemaDesc1, SchemaDescriptor schemaDesc2) {
assertEquals(schemaDesc1.version(), schemaDesc2.version(), "Descriptors of different versions.");
assertTrue(SchemaUtils.equalSchemas(schemaDesc1, schemaDesc2), "Schemas are not equal.");
}
}