blob: 4527437bf54138ff7e5f42d0cc30daaa48675fba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.schema.registry.storage.jdbc.handler;
import com.google.common.collect.Lists;
import com.hazelcast.config.Config;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.map.listener.EntryAddedListener;
import com.hazelcast.map.listener.EntryRemovedListener;
import com.hazelcast.map.listener.EntryUpdatedListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
import org.springframework.util.CollectionUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSchemaMapStore.SCHEMAS;
import static org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSubjectMapStore.SUBJECTS;
@Slf4j
public class SchemaHandler extends IHandler {
private final IMap<String, SchemaInfo> schemas;
private final IMap<String, SchemaRecordInfo> subjects;
private final HazelcastInstance hazelcastInstance;
public SchemaHandler(String hazelcastYamlConfigPath) {
Config config;
try {
config = Config.loadFromFile(new File(hazelcastYamlConfigPath));
} catch (FileNotFoundException e) {
throw new SchemaException(String.format("File [%s] not found", hazelcastYamlConfigPath), e);
}
this.hazelcastInstance = Hazelcast.newHazelcastInstance(config);
this.subjects = this.hazelcastInstance.getMap(SUBJECTS);
this.subjects.loadAll(true);
this.schemas = this.hazelcastInstance.getMap(SCHEMAS);
this.schemas.loadAll(true);
this.schemas.addEntryListener(new SchemaChangeEntryListener(this.subjects), true);
loadAllSubject();
}
private void loadAllSubject() {
for (Map.Entry<String, SchemaInfo> schema : schemas.entrySet()) {
SchemaInfo schemaInfo = schema.getValue();
List<SchemaRecordInfo> allSchemaRecords = schemaInfo.getDetails().getSchemaRecords();
for (SchemaRecordInfo record : allSchemaRecords) {
List<String> recordSubjects =
record.getSubjects().stream().map(SubjectInfo::fullName).collect(Collectors.toList());
recordSubjects.forEach(subject -> {
subjects.put(subject, record);
});
}
}
}
@Override
public void register(SchemaInfo schema) {
if (schemas.containsKey(schema.schemaFullName())) {
log.error(String.format("Schema %s already exists, registration failed", schema.schemaFullName()));
return;
}
schemas.put(schema.schemaFullName(), schema);
}
/**
* update schema
*
* @param update
*/
@Override
public void updateSchema(SchemaInfo update) {
if (!schemas.containsKey(update.schemaFullName())) {
log.warn(String.format("Schema %s does not exist, update failed", update.schemaFullName()));
return;
}
// Get lock
schemas.lock(update.schemaFullName());
try {
SchemaInfo current = schemas.get(update.schemaFullName());
boolean hasVersionDeleted = current.getRecordCount() > update.getRecordCount();
if (current.getLastModifiedTime() != null && update.getLastModifiedTime() != null &&
current.getLastModifiedTime().after(update.getLastModifiedTime())) {
log.info("Current Schema is later version, no need to update.");
return;
}
if (current.getLastRecordVersion() == update.getLastRecordVersion() && !hasVersionDeleted) {
log.info("Schema version is the same, no need to update.");
return;
}
if (current.getLastRecordVersion() > update.getLastRecordVersion() && !hasVersionDeleted) {
throw new SchemaException("Schema version is invalid, update: "
+ update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
}
schemas.put(update.schemaFullName(), update);
} finally {
// unlock
schemas.unlock(update.schemaFullName());
}
}
@Override
public void deleteSchema(QualifiedName qualifiedName) {
schemas.lock(qualifiedName.schemaFullName());
try {
if (!schemas.containsKey(qualifiedName.schemaFullName())) {
log.error(String.format("Schema %s does not exist, delete failed", qualifiedName.schemaFullName()));
return;
}
schemas.delete(qualifiedName);
} finally {
schemas.unlock(qualifiedName.schemaFullName());
}
}
@Override
public void deleteBySubject(QualifiedName qualifiedName) {
schemas.lock(qualifiedName.subjectFullName());
try {
SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
if (schemaInfo == null) {
log.error(String.format("Schema %s does not exist, delete failed",
qualifiedName.subjectFullName()));
return;
}
schemas.delete(schemaInfo.schemaFullName());
} finally {
schemas.unlock(qualifiedName.subjectFullName());
}
}
@Override
public void deleteByVersion(QualifiedName name) {
SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
log.error(String.format("Schema %s does not exist, failed to delete according to version",
name.subjectFullName()));
return;
}
List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects();
List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
if (CollectionUtils.isEmpty(schemaRecords)) {
deleteBySubject(name);
}
if (schemaInfo.getLastRecord().getSubjects().isEmpty()) {
schemaInfo.getLastRecord().setSubjects(subjects);
}
updateSchema(schemaInfo);
}
@Override
public SchemaInfo getSchema(QualifiedName qualifiedName) {
return schemas.get(qualifiedName.schemaFullName());
}
@Override
public SchemaRecordInfo getBySubject(String subjectFullName) {
subjects.lock(subjectFullName);
try {
if (!subjects.containsKey(subjectFullName)) {
return null;
}
return subjects.get(subjectFullName);
} finally {
subjects.unlock(subjectFullName);
}
}
@Override
public SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
SchemaRecordInfo subjectRecordInfo = subjects.get(subjectFullName);
if (subjectRecordInfo == null) {
return null;
}
String schemaFullName = subjectRecordInfo.getSchema();
return schemas.get(schemaFullName);
}
@Override
public List<String> getSubjects(StorageServiceContext context, String tenant) {
List<String> allSubjects = new ArrayList<>();
for (Map.Entry<String, SchemaRecordInfo> schemaRecordEntry : subjects.entrySet()) {
String subjectFullName = schemaRecordEntry.getKey();
String[] subjectFromCache = subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR));
String subjectFromKey = subjectFromCache[2];
// Check permission
allSubjects.add(subjectFromKey);
}
return allSubjects;
}
@Override
public List<String> getTenants(String cluster) {
List<String> tenants = Lists.newArrayList();
for (Map.Entry<String, SchemaRecordInfo> schemaRecordEntry : subjects.entrySet()) {
String subjectFullName = schemaRecordEntry.getKey();
String tenant = subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR))[1];
tenants.add(tenant);
}
return tenants;
}
@Override
public void close() throws IOException {
Hazelcast.shutdownAll();
}
/**
* Schema change entry listener
*/
static class SchemaChangeEntryListener implements EntryAddedListener<String, SchemaInfo>, EntryRemovedListener<String, SchemaInfo>, EntryUpdatedListener<String, SchemaInfo> {
private final IMap<String, SchemaRecordInfo> subjectCache;
SchemaChangeEntryListener(IMap<String, SchemaRecordInfo> localCache) {
this.subjectCache = localCache;
}
@Override
public void entryAdded(EntryEvent<String, SchemaInfo> current) {
SchemaInfo schemaInfo = current.getValue();
subjectCache.put(schemaInfo.subjectFullName(), schemaInfo.getLastRecord());
}
@Override
public void entryRemoved(EntryEvent<String, SchemaInfo> current) {
if (current == null || current.getValue() == null || current.getValue().getDetails() == null) {
return;
}
// Delete subjects bind to any version
List<SchemaRecordInfo> allSchemaRecords = current.getValue().getDetails().getSchemaRecords();
List<String> allSubjects = allSchemaRecords.parallelStream()
.flatMap(record -> record.getSubjects().stream().map(SubjectInfo::fullName))
.collect(Collectors.toList());
allSubjects.forEach(subject -> {
subjectCache.remove(subject);
});
}
@Override
public void entryUpdated(EntryEvent<String, SchemaInfo> update) {
if (update == null || update.getValue() == null || update.getValue().getLastRecord() == null) {
return;
}
SchemaRecordInfo lastSchemaRecordInfo = update.getValue().getLastRecord();
lastSchemaRecordInfo.getSubjects().forEach(subject -> {
subjectCache.put(subject.fullName(), update.getValue().getLastRecord());
});
}
}
}