blob: cce5cd2fb13b6961744b23eccbb021adf72a191c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;
import static java.nio.charset.StandardCharsets.UTF_8;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
public class SchemasImpl extends BaseResource implements Schemas {
private final WebTarget target;
public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
this.target = web.path("/admin/v2/schemas");
}
@Override
public SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn)).get(GetSchemaResponse.class);
return convertGetSchemaResponseToSchemaInfo(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn).path(Long.toString(version)))
.get(GetSchemaResponse.class);
return convertGetSchemaResponseToSchemaInfo(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void deleteSchema(String topic) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
request(schemaPath(tn)).delete(DeleteSchemaResponse.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType(schemaInfo.getType().name());
payload.setProperties(schemaInfo.getProperties());
// for backward compatibility concern, we convert `bytes` to `string`
// we can consider fixing it in a new version of rest endpoint
payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
createSchema(topic, payload);
}
@Override
public void createSchema(String topic, PostSchemaPayload payload) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
request(schemaPath(tn))
.post(Entity.json(payload), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
private WebTarget schemaPath(TopicName topicName) {
return target
.path(topicName.getTenant())
.path(topicName.getNamespacePortion())
.path(topicName.getEncodedLocalName())
.path("schema");
}
// the util function converts `GetSchemaResponse` to `SchemaInfo`
static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
GetSchemaResponse response) {
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes(UTF_8));
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
}
// the util function exists for backward compatibility concern
static String convertSchemaDataToStringLegacy(byte[] schemaData) {
if (null == schemaData) {
return "";
}
return new String(schemaData, UTF_8);
}
}