| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.schema.registry.client; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import lombok.AllArgsConstructor; |
| import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException; |
| import org.apache.rocketmq.schema.registry.client.rest.RestService; |
| import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse; |
| import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse; |
| import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest; |
| import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse; |
| import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto; |
| import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest; |
| import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| public class CachedSchemaRegistryClient implements SchemaRegistryClient { |
| |
| private static final String DEFAULT_TENANT = "default"; |
| private static final String DEFAULT_CLUSTER = "default"; |
| private static final int DEFAULT_CAPACITY = 100; |
| private static final int DEFAULT_DURATION = 10; |
| |
| private final RestService restService; |
| |
| /** |
| * when deleting schema by subject, use these maps to invalidate all caches by KEY SubjectAndId / SubjectAndVersion / SubjectAndSchema |
| */ |
| private final Map<String, Set<Long>> subjectToId; // restore recordIds that cached in SubjectAndId, used when delete all subject caches |
| |
| private final Map<String, Set<Long>> subjectToVersion; // restore versions that cached in SubjectAndVersion, used when delete all subject caches |
| |
| private final Map<String, Set<String>> subjectToSchema; // restore schema that cached in SubjectAndSchema, used when delete all subject caches |
| |
| private final Cache<SubjectAndVersion, GetSchemaResponse> schemaCacheBySubjectAndVersion; |
| |
| private final Cache<String, List<String>> subjectCache; //cache for subject |
| |
| private final Cache<SubjectAndId, GetSchemaResponse> schemaCacheBySubjectAndId; |
| |
| private final Cache<String, GetSchemaResponse> schemaCacheBySubject; //schema cache by Subject only |
| |
| private final Cache<SubjectAndSchema, GetSchemaResponse> schemaCache; //schema cache by SubjectAndSchema |
| |
| private final Cache<String, List<String>> tenantCache; |
| |
| public CachedSchemaRegistryClient(RestService restService) { |
| this(restService, DEFAULT_CAPACITY, TimeUnit.MINUTES, DEFAULT_DURATION); |
| } |
| |
| public CachedSchemaRegistryClient(RestService restService, int capacity, TimeUnit unit, int duration) { |
| this.restService = restService; |
| subjectToId = new HashMap<>(); |
| subjectToVersion = new HashMap<>(); |
| subjectToSchema = new HashMap<>(); |
| this.subjectCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build(); |
| this.schemaCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build(); |
| this.tenantCache = CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(1, unit).build(); |
| this.schemaCacheBySubjectAndVersion = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build(); |
| this.schemaCacheBySubjectAndId = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build(); |
| this.schemaCacheBySubject = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build(); |
| } |
| |
| @Override |
| public RegisterSchemaResponse registerSchema(String subject, String schemaName, |
| RegisterSchemaRequest request) throws RestClientException, IOException { |
| return restService.registerSchema(subject, schemaName, request); |
| } |
| |
| @Override |
| public RegisterSchemaResponse registerSchema(String clusterName, String tenant, String subjectName, |
| String schemaName, RegisterSchemaRequest request) throws IOException, RestClientException { |
| return restService.registerSchema(clusterName, tenant, subjectName, schemaName, request); |
| } |
| |
| @Override |
| public DeleteSchemeResponse deleteSchema(String cluster, String tenant, |
| String subject) throws IOException, RestClientException { |
| String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject); |
| |
| schemaCacheBySubject.invalidate(subjectFullName); |
| //invalidate schemaCacheBySubjectAndVersion |
| if (subjectToVersion.get(subjectFullName) != null) { |
| subjectToVersion.get(subjectFullName).forEach( |
| version -> schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version))); |
| } |
| //invalidate schemaCacheBySubjectAndId |
| if (subjectToId.get(subjectFullName) != null) { |
| subjectToId.get(subjectFullName).forEach( |
| recordId -> schemaCacheBySubjectAndId.invalidate(new SubjectAndId(cluster, tenant, subject, recordId))); |
| } |
| // invalidate schemaCache |
| if (subjectToSchema.get(subjectFullName) != null) { |
| subjectToSchema.get(subjectFullName).forEach( |
| schema -> schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subject, schema))); |
| } |
| subjectToVersion.remove(subjectFullName); |
| subjectToId.remove(subjectFullName); |
| subjectToSchema.remove(subjectFullName); |
| return restService.deleteSchema(cluster, tenant, subject); |
| } |
| |
| @Override |
| public DeleteSchemeResponse deleteSchema(String cluster, String tenant, String subject, |
| long version) throws IOException, RestClientException { |
| String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject); |
| schemaCacheBySubject.invalidate(subjectFullName); |
| schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version)); |
| if (subjectToVersion.containsKey(subjectFullName)) { |
| subjectToVersion.get(subjectFullName).remove(version); |
| } |
| return restService.deleteSchema(cluster, tenant, subject, version); |
| } |
| |
| @Override |
| public UpdateSchemaResponse updateSchema(String subject, String schemaName, |
| UpdateSchemaRequest request) throws RestClientException, IOException { |
| // invalidate schemaCache |
| schemaCache.invalidate(new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName)); |
| return restService.updateSchema(subject, schemaName, request); |
| } |
| |
| @Override |
| public UpdateSchemaResponse updateSchema(String cluster, String tenant, String subjectName, |
| String schemaName, UpdateSchemaRequest request) throws IOException, RestClientException { |
| // invalidate schemaCache |
| schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subjectName, schemaName)); |
| return restService.updateSchema(cluster, tenant, subjectName, schemaName, request); |
| } |
| |
| @Override |
| public GetSchemaResponse getSchemaBySubject(String subject) throws RestClientException, IOException { |
| String fullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject); |
| GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName); |
| if (result != null) { |
| return result; |
| } |
| result = restService.getSchemaBySubject(subject); |
| schemaCacheBySubject.put(fullName, result); |
| return result; |
| } |
| |
| @Override |
| public GetSchemaResponse getSchemaBySubject(String cluster, String tenant, |
| String subject) throws RestClientException, IOException { |
| String fullName = String.format("%s/%s/%s", cluster, tenant, subject); |
| GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName); |
| if (result != null) { |
| return result; |
| } |
| result = restService.getSchemaBySubject(cluster, tenant, subject); |
| schemaCacheBySubject.put(fullName, result); |
| return result; |
| } |
| |
| @Override |
| public GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String tenant, String subject, |
| long version) throws IOException, RestClientException { |
| SubjectAndVersion subjectAndVersion = new SubjectAndVersion(cluster, tenant, subject, version); |
| GetSchemaResponse result = schemaCacheBySubjectAndVersion.getIfPresent(subjectAndVersion); |
| if (result != null) { |
| return result; |
| } |
| |
| String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject); |
| Set<Long> versions = subjectToId.get(subjectFullName); |
| if (versions == null) { |
| versions = new HashSet<>(); |
| } |
| versions.add(version); |
| subjectToId.put(subjectFullName, versions); |
| |
| result = restService.getSchemaBySubject(cluster, tenant, subject, version); |
| schemaCacheBySubjectAndVersion.put(subjectAndVersion, result); |
| return result; |
| } |
| |
| public GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long version) |
| throws IOException, RestClientException { |
| SubjectAndVersion subjectAndVersion = new SubjectAndVersion(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, version); |
| GetSchemaResponse result = schemaCacheBySubjectAndVersion.getIfPresent(subjectAndVersion); |
| if (result != null) { |
| return result; |
| } |
| |
| String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject); |
| Set<Long> versions = subjectToId.get(subjectFullName); |
| if (versions == null) { |
| versions = new HashSet<>(); |
| } |
| versions.add(version); |
| subjectToId.put(subjectFullName, versions); |
| |
| result = restService.getSchemaBySubject(subject, version); |
| schemaCacheBySubjectAndVersion.put(subjectAndVersion, result); |
| return result; |
| } |
| |
| @Override |
| public GetSchemaResponse getTargetSchema(String cluster, String tenant, String subject, String schema) |
| throws RestClientException, IOException { |
| SubjectAndSchema subjectAndSchema = new SubjectAndSchema(cluster, tenant, subject, schema); |
| GetSchemaResponse result = schemaCache.getIfPresent(subjectAndSchema); |
| if (result != null) { |
| return result; |
| } |
| String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject); |
| result = restService.getTargetSchema(cluster, tenant, subject, schema); |
| schemaCache.put(subjectAndSchema, result); |
| |
| Set<String> schemas = subjectToSchema.get(subjectFullName); |
| if (schemas == null) { |
| schemas = new HashSet<>(); |
| } |
| schemas.add(schema); |
| subjectToSchema.put(subjectFullName, schemas); |
| |
| return result; |
| } |
| |
| @Override |
| public GetSchemaResponse getTargetSchema(String subject, String schema) |
| throws RestClientException, IOException { |
| SubjectAndSchema subjectAndSchema = new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schema); |
| GetSchemaResponse result = schemaCache.getIfPresent(subjectAndSchema); |
| if (result != null) { |
| return result; |
| } |
| result = restService.getTargetSchema(subject, schema); |
| schemaCache.put(subjectAndSchema, result); |
| |
| String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject); |
| Set<String> schemas = subjectToSchema.get(subjectFullName); |
| if (schemas == null) { |
| schemas = new HashSet<>(); |
| } |
| schemas.add(schema); |
| subjectToSchema.put(subjectFullName, schemas); |
| |
| return result; |
| } |
| |
| @Override |
| public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant, |
| String subject) throws RestClientException, IOException { |
| return restService.getSchemaListBySubject(cluster, tenant, subject); |
| } |
| |
| @Override |
| public List<String> getSubjectsByTenant(String cluster, String tenant) |
| throws RestClientException, IOException { |
| String fullName = String.format("%s/%s", cluster, tenant); |
| List<String> result = subjectCache.getIfPresent(fullName); |
| if (!result.isEmpty()) { |
| return result; |
| } |
| result = restService.getSubjectsByTenant(cluster, tenant); |
| subjectCache.put(fullName, result); |
| return result; |
| } |
| |
| @Override |
| public List<String> getAllTenants(String cluster) throws RestClientException, IOException { |
| List<String> result = tenantCache.getIfPresent(cluster); |
| if (result != null) { |
| return result; |
| } |
| result = restService.getAllTenants(cluster); |
| tenantCache.put(cluster, result); |
| return result; |
| } |
| |
| public GetSchemaResponse getSchemaByRecordId(String cluster, String tenant, String subject, |
| long recordId) throws RestClientException, IOException { |
| SubjectAndId subjectAndId = new SubjectAndId(cluster, tenant, subject, recordId); |
| GetSchemaResponse result = schemaCacheBySubjectAndId.getIfPresent(subjectAndId); |
| if (result != null) { |
| return result; |
| } |
| String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject); |
| |
| Set<Long> recordIds = subjectToId.get(subjectFullName); |
| if (recordIds == null) { |
| recordIds = new HashSet<>(); |
| } |
| recordIds.add(recordId); |
| subjectToId.put(subjectFullName, recordIds); |
| |
| result = restService.getSchemaByRecordId(cluster, tenant, subject, recordId); |
| schemaCacheBySubjectAndId.put(subjectAndId, result); |
| return result; |
| } |
| |
| @Override |
| public GetSchemaResponse getSchemaByRecordId(String subject, long recordId) |
| throws RestClientException, IOException { |
| SubjectAndId subjectAndId = new SubjectAndId(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, recordId); |
| GetSchemaResponse result = schemaCacheBySubjectAndId.getIfPresent(subjectAndId); |
| if (result != null) { |
| return result; |
| } |
| |
| String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject); |
| |
| Set<Long> recordIds = subjectToId.get(subjectFullName); |
| if (recordIds == null) { |
| recordIds = new HashSet<>(); |
| } |
| recordIds.add(recordId); |
| subjectToId.put(subjectFullName, recordIds); |
| |
| result = restService.getSchemaByRecordId(subject, recordId); |
| schemaCacheBySubjectAndId.put(subjectAndId, result); |
| return result; |
| } |
| |
| @AllArgsConstructor |
| static class SubjectAndId { |
| private String cluster; |
| private String tenant; |
| private String subject; |
| private long recordId; |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| SubjectAndId that = (SubjectAndId) o; |
| return Objects.equals(subject, that.subject) |
| && Objects.equals(tenant, that.tenant) |
| && Objects.equals(cluster, that.cluster) |
| && recordId == that.recordId; |
| } |
| |
| public String getCluster() { |
| return cluster; |
| } |
| |
| public String getTenant() { |
| return tenant; |
| } |
| |
| public String getSubject() { |
| return subject; |
| } |
| |
| public long getRecordId() { |
| return recordId; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(cluster, tenant, subject, recordId); |
| } |
| |
| @Override |
| public String toString() { |
| return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", recordId=" + recordId + '}'; |
| } |
| } |
| |
| @AllArgsConstructor |
| static class SubjectAndVersion { |
| private String cluster; |
| private String tenant; |
| private String subject; |
| private long version; |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| SubjectAndVersion that = (SubjectAndVersion) o; |
| return Objects.equals(subject, that.subject) |
| && Objects.equals(tenant, that.tenant) |
| && Objects.equals(cluster, that.cluster) |
| && version == that.version; |
| } |
| |
| public String getCluster() { |
| return cluster; |
| } |
| |
| public String getTenant() { |
| return tenant; |
| } |
| |
| public String getSubject() { |
| return subject; |
| } |
| |
| public long getVersion() { |
| return version; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(cluster, tenant, subject, version); |
| } |
| |
| @Override |
| public String toString() { |
| return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", version=" + version + '}'; |
| } |
| } |
| |
| @AllArgsConstructor |
| static class SubjectAndSchema { |
| private String cluster; |
| private String tenant; |
| private String subject; |
| private String schema; |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| SubjectAndSchema that = (SubjectAndSchema) o; |
| return Objects.equals(subject, that.subject) |
| && Objects.equals(tenant, that.tenant) |
| && Objects.equals(cluster, that.cluster) |
| && Objects.equals(schema, that.schema); |
| } |
| |
| public String getCluster() { |
| return cluster; |
| } |
| |
| public String getTenant() { |
| return tenant; |
| } |
| |
| public String getSubject() { |
| return subject; |
| } |
| |
| public String getSchema() { |
| return schema; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(cluster, tenant, subject, schema); |
| } |
| |
| @Override |
| public String toString() { |
| return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", schema=" + schema + '}'; |
| } |
| } |
| } |
| |