blob: 0bab772044a6d0071a493f4900361c487535472f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchemasResourceBase extends AdminResource {
private final Clock clock;
public SchemasResourceBase() {
this(Clock.systemUTC());
}
@VisibleForTesting
public SchemasResourceBase(Clock clock) {
super();
this.clock = clock;
}
protected static long getLongSchemaVersion(SchemaVersion schemaVersion) {
if (schemaVersion instanceof LongSchemaVersion) {
return ((LongSchemaVersion) schemaVersion).getVersion();
} else {
return -1L;
}
}
private String getSchemaId() {
if (topicName.isPartitioned()) {
return TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
} else {
return topicName.getSchemaName();
}
}
public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
.thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
}
public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
.thenCompose(schemaId -> {
ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
bbVersion.putLong(Long.parseLong(version));
SchemaRegistryService schemaRegistryService = pulsar().getSchemaRegistryService();
SchemaVersion schemaVersion = schemaRegistryService.versionFromBytes(bbVersion.array());
return schemaRegistryService.getSchema(schemaId, schemaVersion);
});
}
public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
String schemaId = getSchemaId();
return pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId);
});
}
public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> {
String schemaId = getSchemaId();
return pulsar().getSchemaRegistryService()
.deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force);
});
}
public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)
.thenCompose(__ -> getSchemaCompatibilityStrategyAsyncWithoutAuth())
.thenCompose(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
try {
data = DefaultImplementation.getDefaultImplementation()
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema()
.getBytes(StandardCharsets.UTF_8));
} catch (IOException conversionError) {
throw new RestException(conversionError);
}
} else {
data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
}
return pulsar().getSchemaRegistryService()
.putSchemaIfAbsent(getSchemaId(),
SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties())
.build(),
schemaCompatibilityStrategy);
});
}
public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
.thenCompose(strategy -> {
String schemaId = getSchemaId();
return pulsar().getSchemaRegistryService().isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
.isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties())
.build(), strategy)
.thenApply(v -> Pair.of(v, strategy));
});
}
public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
String schemaId = getSchemaId();
return pulsar().getSchemaRegistryService()
.findSchemaVersion(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
.isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties()).build());
});
}
@Override
protected String domain() {
return "persistent";
}
private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaAndMetadata schemaAndMetadata) {
try {
String schemaData;
if (schemaAndMetadata.schema.getType() == SchemaType.KEY_VALUE) {
schemaData = DefaultImplementation.getDefaultImplementation().convertKeyValueSchemaInfoDataToString(
DefaultImplementation.getDefaultImplementation()
.decodeKeyValueSchemaInfo(schemaAndMetadata.schema.toSchemaInfo()));
} else {
schemaData = new String(schemaAndMetadata.schema.getData(), StandardCharsets.UTF_8);
}
return GetSchemaResponse.builder().version(getLongSchemaVersion(schemaAndMetadata.version))
.type(schemaAndMetadata.schema.getType()).timestamp(schemaAndMetadata.schema.getTimestamp())
.data(schemaData).properties(schemaAndMetadata.schema.getProps()).build();
} catch (IOException conversionError) {
throw new RuntimeException(conversionError);
}
}
protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
if (isNull(schema)) {
throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
} else if (schema.schema.isDeleted()) {
throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted");
}
return convertSchemaAndMetadataToGetSchemaResponse(schema);
}
protected GetAllVersionsSchemaResponse convertToAllVersionsSchemaResponse(List<SchemaAndMetadata> schemas) {
if (isNull(schemas)) {
throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found");
} else {
return GetAllVersionsSchemaResponse.builder()
.getSchemaResponses(schemas.stream()
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
.collect(Collectors.toList()))
.build();
}
}
private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
}
private CompletableFuture<Void> validateOwnershipAndOperationAsync(boolean authoritative,
TopicOperation operation) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, operation));
}
protected boolean shouldPrintErrorLog(Throwable ex) {
return !isRedirectException(ex) && !isNotFoundException(ex);
}
private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
}