blob: 57246f0f1e440bb86cd8b1e044c87ebd67a09f8e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private static HashFunction hashFunction = Hashing.sha256();
private final Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks;
private final SchemaStorage schemaStorage;
private final Clock clock;
@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks, Clock clock) {
this.schemaStorage = schemaStorage;
this.compatibilityChecks = compatibilityChecks;
this.clock = clock;
}
@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks) {
this(schemaStorage, compatibilityChecks, Clock.systemUTC());
}
@Override
@NotNull
public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
return getSchema(schemaId, SchemaVersion.Latest);
}
@Override
@NotNull
public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version) {
return schemaStorage.get(schemaId, version).thenCompose(stored -> {
if (isNull(stored)) {
return completedFuture(null);
} else {
return Functions.bytesToSchemaInfo(stored.data)
.thenApply(Functions::schemaInfoToSchema)
.thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version));
}
}
);
}
@Override
public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchemas(String schemaId) {
return schemaStorage.getAll(schemaId).thenApply(schemas ->
schemas.stream().map(future -> future.thenCompose(stored ->
Functions.bytesToSchemaInfo(stored.data)
.thenApply(Functions::schemaInfoToSchema)
.thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version))
)).collect(Collectors.toList()));
}
@Override
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return getSchema(schemaId)
.thenCompose(
(existingSchema) ->
{
if (existingSchema == null || existingSchema.schema.isDeleted()) {
return completedFuture(true);
} else {
return isCompatible(schemaId, schema, strategy);
}
}
)
.thenCompose(isCompatible -> {
if (isCompatible) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
.setSchemaId(schemaId)
.setUser(schema.getUser())
.setDeleted(false)
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
return schemaStorage.put(schemaId, info.toByteArray(), context);
} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException());
}
});
}
@Override
@NotNull
public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
byte[] deletedEntry = deleted(schemaId, user).toByteArray();
return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
}
@Override
public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
switch (strategy) {
case FORWARD_TRANSITIVE:
case BACKWARD_TRANSITIVE:
case FULL_TRANSITIVE:
return checkCompatibilityWithAll(schemaId, schema, strategy);
default:
return checkCompatibilityWithLatest(schemaId, schema, strategy);
}
}
@Override
public SchemaVersion versionFromBytes(byte[] version) {
return schemaStorage.versionFromBytes(version);
}
@Override
public void close() throws Exception {
schemaStorage.close();
}
private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) {
return SchemaRegistryFormat.SchemaInfo.newBuilder()
.setSchemaId(schemaId)
.setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE)
.setSchema(ByteString.EMPTY)
.setUser(user)
.setDeleted(true)
.setTimestamp(clock.millis())
.build();
}
private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema,
SchemaCompatibilityStrategy strategy) {
HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());
HashCode newHash = hashFunction.hashBytes(newSchema.getData());
return newHash.equals(existingHash) ||
compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
.isCompatible(existingSchema.schema, newSchema, strategy);
}
private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return getSchema(schemaId)
.thenApply(
(existingSchema) ->
!(existingSchema == null || existingSchema.schema.isDeleted())
&& isCompatible(existingSchema, schema, strategy));
}
private CompletableFuture<Boolean> checkCompatibilityWithAll(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return getAllSchemas(schemaId)
.thenCompose(FutureUtils::collect)
.thenApply(schemaAndMetadataList -> schemaAndMetadataList
.stream()
.map(schemaAndMetadata -> schemaAndMetadata.schema)
.collect(Collectors.toList()))
.thenApply(schemas -> compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
.isCompatible(schemas, schema, strategy));
}
interface Functions {
static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
if (type.getNumber() < 0) {
return SchemaType.NONE;
} else {
// the value of type in `SchemaType` is always 1 less than the value of type `SchemaInfo.SchemaType`
return SchemaType.valueOf(type.getNumber() - 1);
}
}
static SchemaRegistryFormat.SchemaInfo.SchemaType convertFromDomainType(SchemaType type) {
if (type.getValue() < 0) {
return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
} else {
return SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(type.getValue() + 1);
}
}
static Map<String, String> toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs) {
Map<String, String> map = new HashMap<>();
for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) {
map.put(pair.getKey(), pair.getValue());
}
return map;
}
static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> toPairs(Map<String, String> map) {
if (isNull(map)) {
return Collections.emptyList();
}
List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new ArrayList<>(map.size());
for (Map.Entry<String, String> entry : map.entrySet()) {
SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build());
}
return pairs;
}
static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo info) {
return SchemaData.builder()
.user(info.getUser())
.type(convertToDomainType(info.getType()))
.data(info.getSchema().toByteArray())
.isDeleted(info.getDeleted())
.props(toMap(info.getPropsList()))
.build();
}
static CompletableFuture<SchemaRegistryFormat.SchemaInfo> bytesToSchemaInfo(byte[] bytes) {
CompletableFuture<SchemaRegistryFormat.SchemaInfo> future;
try {
future = completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes));
} catch (InvalidProtocolBufferException e) {
future = new CompletableFuture<>();
future.completeExceptionally(e);
}
return future;
}
}
}