blob: 58101d6a0ab1d7d02bcaefe68fba45664a1f31af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.schema.registry;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.mapping.ColumnMapper;
import org.apache.ignite.internal.schema.mapping.ColumnMapping;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
/**
* Caching registry of actual schema descriptors for a table.
*/
public class SchemaRegistryImpl implements SchemaRegistry {
/** Cached schemas. */
private final ConcurrentNavigableMap<Integer, SchemaDescriptor> schemaCache = new ConcurrentSkipListMap<>();
/** Column mappers cache. */
private final Map<Long, ColumnMapper> mappingCache = new ConcurrentHashMap<>();
/**
* Schema store. It's only safe to apply the function to version numbers for which there is guarantee that the schema was already saved
* to the Metastore.
*/
private final SchemaDescriptorLoader schemaDescriptorLoader;
private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
private final InFlightFutures inFlightTableSchemaFutures = new InFlightFutures();
/**
* Constructor.
*
* @param schemaDescriptorLoader Schema history.
* @param initialSchema Initial schema.
*/
public SchemaRegistryImpl(SchemaDescriptorLoader schemaDescriptorLoader, SchemaDescriptor initialSchema) {
this.schemaDescriptorLoader = schemaDescriptorLoader;
makeSchemaVersionAvailable(initialSchema);
}
private void makeSchemaVersionAvailable(SchemaDescriptor desc) {
schemaCache.putIfAbsent(desc.version(), desc);
versionTracker.update(desc.version(), null);
}
@Override
public SchemaDescriptor lastKnownSchema() {
return schema(lastKnownSchemaVersion());
}
@Override
public SchemaDescriptor schema(int version) {
int actualVersion = versionOrLatestForZero(version);
SchemaDescriptor desc = getFromCacheOrLoad(actualVersion);
if (desc != null) {
return desc;
}
if (actualVersion <= 0 || actualVersion > schemaCache.lastKey()) {
throw new SchemaRegistryException("Incorrect schema version requested: ver=" + actualVersion);
} else {
throw failedToFindSchemaException(actualVersion);
}
}
private @Nullable SchemaDescriptor getFromCacheOrLoad(int version) {
SchemaDescriptor desc = schemaCache.get(version);
if (desc != null) {
return desc;
}
desc = loadStoredSchemaByVersion(version);
if (desc != null) {
makeSchemaVersionAvailable(desc);
}
return desc;
}
private static SchemaRegistryException failedToFindSchemaException(int version) {
return new SchemaRegistryException("Failed to find schema (was it compacted away?) [version=" + version + "]");
}
private int versionOrLatestForZero(int version) {
if (version == 0) {
// Use last version (any version may be used) for 0 version, that mean row doesn't contain value.
return schemaCache.lastKey();
} else {
return version;
}
}
@Override
public CompletableFuture<SchemaDescriptor> schemaAsync(int version) {
if (version <= 0) {
return failedFuture(new SchemaRegistryException("Unsupported schema version [version=" + version + "]"));
}
SchemaDescriptor desc = getFromCacheOrLoad(version);
if (desc != null) {
return completedFuture(desc);
}
return tableSchemaAsync(version)
.whenComplete((loadedDesc, ex) -> {
if (ex == null) {
if (loadedDesc == null) {
throw failedToFindSchemaException(version);
}
makeSchemaVersionAvailable(loadedDesc);
}
});
}
@Override
public int lastKnownSchemaVersion() {
return schemaCache.lastKey();
}
@Override
public Row resolve(BinaryRow row, int targetSchemaVersion) {
SchemaDescriptor targetSchema = schema(targetSchemaVersion);
throwIfNoSuchSchema(targetSchema, targetSchemaVersion);
return resolveInternal(row, targetSchema, false);
}
@Override
public Row resolve(BinaryRow row, SchemaDescriptor schemaDescriptor) {
return resolveInternal(row, schemaDescriptor, false);
}
@Override
public List<Row> resolve(Collection<BinaryRow> binaryRows, int targetSchemaVersion) {
return resolveInternal(binaryRows, targetSchemaVersion, false);
}
private static void throwIfNoSuchSchema(SchemaDescriptor targetSchema, int targetSchemaVersion) {
if (targetSchema == null) {
throw new SchemaRegistryException("No schema found: schemaVersion=" + targetSchemaVersion);
}
}
@Override
public List<Row> resolveKeys(Collection<BinaryRow> keyOnlyRows, int targetSchemaVersion) {
return resolveInternal(keyOnlyRows, targetSchemaVersion, true);
}
@Override
public void close() {
versionTracker.close();
inFlightTableSchemaFutures.cancelInFlightFutures();
}
/**
* Resolves a schema for row. The method is optimal when the latest schema is already got.
*
* @param binaryRow Binary row.
* @param targetSchema The target schema.
* @param keyOnly {@code true} if the given {@code binaryRow} only contains a key component, {@code false} otherwise.
* @return Schema-aware row.
* @throws SchemaRegistryException if no schema exists for the given row.
*/
private Row resolveInternal(BinaryRow binaryRow, SchemaDescriptor targetSchema, boolean keyOnly) {
if (binaryRow.schemaVersion() == 0 || targetSchema.version() == binaryRow.schemaVersion()) {
return keyOnly ? Row.wrapKeyOnlyBinaryRow(targetSchema, binaryRow) : Row.wrapBinaryRow(targetSchema, binaryRow);
}
SchemaDescriptor rowSchema = schema(binaryRow.schemaVersion());
ColumnMapper mapping = resolveMapping(targetSchema, rowSchema);
if (keyOnly) {
Row row = Row.wrapKeyOnlyBinaryRow(rowSchema, binaryRow);
return UpgradingRowAdapter.upgradeKeyOnlyRow(targetSchema, mapping, row);
} else {
Row row = Row.wrapBinaryRow(rowSchema, binaryRow);
return UpgradingRowAdapter.upgradeRow(targetSchema, mapping, row);
}
}
private List<Row> resolveInternal(Collection<BinaryRow> binaryRows, int targetSchemaVersion, boolean keyOnly) {
SchemaDescriptor targetSchema = schema(targetSchemaVersion);
throwIfNoSuchSchema(targetSchema, targetSchemaVersion);
var rows = new ArrayList<Row>(binaryRows.size());
for (BinaryRow row : binaryRows) {
rows.add(row == null ? null : resolveInternal(row, targetSchema, keyOnly));
}
return rows;
}
/**
* Get cached or create a column mapper that maps column of current schema to the row schema as they may have different schema indices.
*
* @param curSchema Target schema.
* @param rowSchema Row schema.
* @return Column mapper for target schema.
*/
private ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor rowSchema) {
assert curSchema.version() > rowSchema.version();
if (curSchema.version() == rowSchema.version() + 1) {
return curSchema.columnMapping();
}
long mappingKey = (((long) curSchema.version()) << 32) | (rowSchema.version());
ColumnMapper mapping = mappingCache.get(mappingKey);
if (mapping != null) {
return mapping;
}
mapping = schema(rowSchema.version() + 1).columnMapping();
for (int i = rowSchema.version() + 2; i <= curSchema.version(); i++) {
mapping = ColumnMapping.mergeMapping(mapping, schema(i));
}
mappingCache.putIfAbsent(mappingKey, mapping);
return mapping;
}
/**
* Registers new schema.
*
* @param desc Schema descriptor.
* @throws SchemaRegistrationConflictException If schema of provided version was already registered.
* @throws SchemaRegistryException If schema of incorrect version provided.
*/
public void onSchemaRegistered(SchemaDescriptor desc) {
int lastVer = schemaCache.lastKey();
if (desc.version() != lastVer + 1) {
if (desc.version() > 0 && desc.version() <= lastVer) {
throw new SchemaRegistrationConflictException("Schema with given version has been already registered: " + desc.version());
}
throw new SchemaRegistryException("Try to register schema of wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
}
makeSchemaVersionAvailable(desc);
}
private CompletableFuture<SchemaDescriptor> tableSchemaAsync(int schemaVer) {
if (schemaVer < lastKnownSchemaVersion()) {
return completedFuture(loadStoredSchemaByVersion(schemaVer));
}
CompletableFuture<SchemaDescriptor> future = versionTracker.waitFor(schemaVer)
.thenApply(unused -> schemaCache.get(schemaVer));
inFlightTableSchemaFutures.registerFuture(future);
return future;
}
private @Nullable SchemaDescriptor loadStoredSchemaByVersion(int schemaVer) {
return schemaDescriptorLoader.load(schemaVer);
}
}