blob: 1d94c570344c99468f4f207866ac518b3993b07d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.schema.registry.storage.jdbc;
import org.apache.avro.Schema;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaType;
import org.apache.rocketmq.schema.registry.common.storage.StorageService;
import org.apache.rocketmq.schema.registry.storage.jdbc.handler.IHandler;
import org.apache.rocketmq.schema.registry.storage.jdbc.handler.SchemaHandler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* mysql storage service
*/
public class JdbcStorageService
implements StorageService<SchemaInfo> {
private final String hazelcastYamlConfigPath;
private final IHandler handler;
public JdbcStorageService(StoragePluginContext context) {
this.hazelcastYamlConfigPath = context.getConfig().getStorageConfigPath();
this.handler = new SchemaHandler(this.hazelcastYamlConfigPath);
}
@Override
public SchemaInfo register(StorageServiceContext context, SchemaInfo schemaInfo) {
try {
handler.register(schemaInfo);
return schemaInfo;
} catch (Exception e) {
throw new SchemaException("Registry schema failed", e);
}
}
@Override
public void delete(StorageServiceContext context, QualifiedName name) {
if (name.getVersion() == null) {
handler.deleteBySubject(name);
} else {
handler.deleteByVersion(name);
}
}
@Override
public SchemaInfo update(StorageServiceContext context, SchemaInfo schemaInfo) {
handler.updateSchema(schemaInfo);
return schemaInfo;
}
@Override
public SchemaInfo get(StorageServiceContext context, QualifiedName qualifiedName) {
return handler.getSchema(qualifiedName);
}
@Override
public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName qualifiedName) {
if (qualifiedName.getVersion() == null) {
SchemaRecordInfo result = handler.getBySubject(qualifiedName.subjectFullName());
return result;
}
// schema version is given
SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
return null;
}
Map<Long, SchemaRecordInfo> versionSchemaMap = schemaInfo.getDetails().getSchemaRecords()
.stream().collect(Collectors.toMap(SchemaRecordInfo::getVersion, Function.identity()));
return versionSchemaMap.get(qualifiedName.getVersion());
}
@Override
public SchemaRecordInfo getTargetSchema(StorageServiceContext context, QualifiedName qualifiedName) {
// schema version is given
SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
return null;
}
SchemaMetaInfo schemaMetaInfo = schemaInfo.getMeta();
if (schemaMetaInfo == null) {
return null;
}
if (schemaMetaInfo.getType() == SchemaType.AVRO) {
for (SchemaRecordInfo schemaRecordInfo : schemaInfo.getDetails().getSchemaRecords()) {
Schema store = new Schema.Parser().parse(schemaRecordInfo.getIdl());
Schema target = new Schema.Parser().parse(qualifiedName.getSchema());
if (Objects.equals(store, target)) {
return schemaRecordInfo;
}
}
} else {
//todo support other type
return null;
}
return null;
}
@Override
public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, QualifiedName qualifiedName) {
SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null) {
return null;
}
return schemaInfo.getDetails().getSchemaRecords();
}
@Override
public List<String> listSubjectsByTenant(StorageServiceContext context, QualifiedName qualifiedName) {
return handler.getSubjects(context, qualifiedName.getTenant());
}
@Override
public List<String> listTenants(StorageServiceContext storageService, QualifiedName qualifiedName) {
return handler.getTenants(qualifiedName.getCluster());
}
@Override
public void close() throws IOException {
if (this.handler != null) {
this.handler.close();
}
}
}