| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.skywalking.oap.server.core.storage.model; |
| |
| import java.lang.reflect.Field; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Objects; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; |
| import org.apache.skywalking.oap.server.core.storage.StorageException; |
| import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; |
| import org.apache.skywalking.oap.server.core.storage.annotation.Column; |
| import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch; |
| import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase; |
| import org.apache.skywalking.oap.server.core.storage.annotation.Storage; |
| import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset; |
| import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata; |
| import org.apache.skywalking.oap.server.library.util.StringUtil; |
| |
| /** |
| * StorageModels manages all models detected by the core. |
| */ |
| @Slf4j |
| public class StorageModels implements IModelManager, ModelCreator, ModelManipulator { |
| private final List<Model> models; |
| private final HashMap<String, String> columnNameOverrideRule; |
| private final List<CreatingListener> listeners; |
| |
| public StorageModels() { |
| this.models = new ArrayList<>(); |
| this.columnNameOverrideRule = new HashMap<>(); |
| this.listeners = new ArrayList<>(); |
| } |
| |
| @Override |
| public Model add(Class<?> aClass, int scopeId, Storage storage, boolean record) throws StorageException { |
| // Check this scope id is valid. |
| DefaultScopeDefine.nameOf(scopeId); |
| |
| List<ModelColumn> modelColumns = new ArrayList<>(); |
| ShardingKeyChecker checker = new ShardingKeyChecker(); |
| SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension(); |
| retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record); |
| checker.check(storage.getModelName()); |
| |
| Model model = new Model( |
| storage.getModelName(), |
| modelColumns, |
| scopeId, |
| storage.getDownsampling(), |
| record, |
| isSuperDatasetModel(aClass), |
| aClass, |
| storage.isTimeRelativeID(), |
| sqlDBModelExtension |
| ); |
| |
| this.followColumnNameRules(model); |
| models.add(model); |
| |
| for (final CreatingListener listener : listeners) { |
| listener.whenCreating(model); |
| } |
| return model; |
| } |
| |
| private boolean isSuperDatasetModel(Class<?> aClass) { |
| return aClass.isAnnotationPresent(SuperDataset.class); |
| } |
| |
| /** |
| * CreatingListener listener could react when {@link #add(Class, int, Storage, boolean)} model happens. Also, the |
| * added models are being notified in this add operation. |
| */ |
| @Override |
| public void addModelListener(final CreatingListener listener) throws StorageException { |
| listeners.add(listener); |
| for (Model model : models) { |
| listener.whenCreating(model); |
| } |
| } |
| |
| /** |
| * Read model column metadata based on the class level definition. |
| */ |
| private void retrieval(final Class<?> clazz, |
| final String modelName, |
| final List<ModelColumn> modelColumns, |
| final int scopeId, |
| ShardingKeyChecker checker, |
| final SQLDatabaseModelExtension sqlDBModelExtension, |
| boolean record) { |
| if (log.isDebugEnabled()) { |
| log.debug("Analysis {} to generate Model.", clazz.getName()); |
| } |
| |
| Field[] fields = clazz.getDeclaredFields(); |
| |
| for (Field field : fields) { |
| if (field.isAnnotationPresent(Column.class)) { |
| if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) { |
| if (!record) { |
| throw new IllegalStateException("Model [" + modelName + "] is not a Record, @SQLDatabase.AdditionalEntity only supports Record."); |
| } |
| } |
| |
| Column column = field.getAnnotation(Column.class); |
| // Use the column#length as the default column length, as read the system env as the override mechanism. |
| // Log the error but don't block the startup sequence. |
| int columnLength = column.length(); |
| final String lengthEnvVariable = column.lengthEnvVariable(); |
| if (StringUtil.isNotEmpty(lengthEnvVariable)) { |
| final String envValue = System.getenv(lengthEnvVariable); |
| if (StringUtil.isNotEmpty(envValue)) { |
| try { |
| columnLength = Integer.parseInt(envValue); |
| } catch (NumberFormatException e) { |
| log.error("Model [{}] Column [{}], illegal value {} of column length from system env [{}]", |
| modelName, column.columnName(), envValue, lengthEnvVariable |
| ); |
| } |
| } |
| } |
| |
| // SQL Database extension |
| SQLDatabaseExtension sqlDatabaseExtension = new SQLDatabaseExtension(); |
| List<SQLDatabase.QueryUnifiedIndex> indexDefinitions = new ArrayList<>(); |
| if (field.isAnnotationPresent(SQLDatabase.QueryUnifiedIndex.class)) { |
| indexDefinitions.add(field.getAnnotation(SQLDatabase.QueryUnifiedIndex.class)); |
| } |
| |
| if (field.isAnnotationPresent(SQLDatabase.MultipleQueryUnifiedIndex.class)) { |
| Collections.addAll( |
| indexDefinitions, field.getAnnotation(SQLDatabase.MultipleQueryUnifiedIndex.class).value()); |
| } |
| |
| indexDefinitions.forEach(indexDefinition -> { |
| sqlDatabaseExtension.appendIndex(new SQLDatabaseExtension.MultiColumnsIndex( |
| column.columnName(), |
| indexDefinition.withColumns() |
| )); |
| }); |
| |
| // ElasticSearch extension |
| final ElasticSearch.MatchQuery elasticSearchAnalyzer = field.getAnnotation( |
| ElasticSearch.MatchQuery.class); |
| ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension( |
| elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer() |
| ); |
| |
| // BanyanDB extension |
| final BanyanDB.ShardingKey banyanDBShardingKey = field.getAnnotation( |
| BanyanDB.ShardingKey.class); |
| final BanyanDB.GlobalIndex banyanDBGlobalIndex = field.getAnnotation( |
| BanyanDB.GlobalIndex.class); |
| final BanyanDB.NoIndexing banyanDBNoIndex = field.getAnnotation( |
| BanyanDB.NoIndexing.class); |
| BanyanDBExtension banyanDBExtension = new BanyanDBExtension( |
| banyanDBShardingKey == null ? -1 : banyanDBShardingKey.index(), |
| banyanDBGlobalIndex == null ? false : true, |
| banyanDBNoIndex != null ? false : column.storageOnly() |
| ); |
| |
| final ModelColumn modelColumn = new ModelColumn( |
| new ColumnName( |
| modelName, |
| column.columnName() |
| ), |
| field.getType(), |
| field.getGenericType(), |
| column.storageOnly(), |
| column.indexOnly(), |
| column.dataType().isValue(), |
| columnLength, |
| sqlDatabaseExtension, |
| elasticSearchExtension, |
| banyanDBExtension |
| ); |
| if (banyanDBExtension.isShardingKey()) { |
| checker.accept(modelName, modelColumn); |
| } |
| |
| if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) { |
| String[] tableNames = field.getAnnotation(SQLDatabase.AdditionalEntity.class).additionalTables(); |
| for (final String tableName : tableNames) { |
| sqlDBModelExtension.appendAdditionalTable(tableName, modelColumn); |
| } |
| if (!field.getAnnotation(SQLDatabase.AdditionalEntity.class).reserveOriginalColumns()) { |
| sqlDBModelExtension.appendExcludeColumns(modelColumn); |
| } |
| } |
| |
| modelColumns.add(modelColumn); |
| if (log.isDebugEnabled()) { |
| log.debug("The field named [{}] with the [{}] type", column.columnName(), field.getType()); |
| } |
| if (column.dataType().isValue()) { |
| ValueColumnMetadata.INSTANCE.putIfAbsent( |
| modelName, column.columnName(), column.dataType(), column.function(), |
| column.defaultValue(), scopeId |
| ); |
| } |
| } |
| } |
| |
| if (Objects.nonNull(clazz.getSuperclass())) { |
| retrieval(clazz.getSuperclass(), modelName, modelColumns, scopeId, checker, sqlDBModelExtension, record); |
| } |
| } |
| |
| @Override |
| public void overrideColumnName(String columnName, String newName) { |
| columnNameOverrideRule.put(columnName, newName); |
| models.forEach(this::followColumnNameRules); |
| ValueColumnMetadata.INSTANCE.overrideColumnName(columnName, newName); |
| } |
| |
| private void followColumnNameRules(Model model) { |
| columnNameOverrideRule.forEach((oldName, newName) -> { |
| model.getColumns().forEach(column -> { |
| column.getColumnName().overrideName(oldName, newName); |
| column.getSqlDatabaseExtension() |
| .getIndices() |
| .forEach(extraQueryIndex -> extraQueryIndex.overrideName(oldName, newName)); |
| }); |
| }); |
| } |
| |
| @Override |
| public List<Model> allModels() { |
| return models; |
| } |
| |
| private class ShardingKeyChecker { |
| private ArrayList<ModelColumn> keys = new ArrayList<>(); |
| |
| /** |
| * @throws IllegalStateException if sharding key indices are conflicting. |
| */ |
| private void accept(String modelName, ModelColumn modelColumn) throws IllegalStateException { |
| final int idx = modelColumn.getBanyanDBExtension().getShardingKeyIdx(); |
| while (idx + 1 > keys.size()) { |
| keys.add(null); |
| } |
| ModelColumn exist = keys.get(idx); |
| if (exist != null) { |
| throw new IllegalStateException( |
| modelName + "'s " |
| + "Column [" + exist.getColumnName() + "] and column [" + modelColumn.getColumnName() |
| + " are conflicting with sharding key index=" + modelColumn.getBanyanDBExtension() |
| .getShardingKeyIdx()); |
| } |
| keys.set(idx, modelColumn); |
| } |
| |
| /** |
| * @param modelName model name of the entity |
| * @throws IllegalStateException if sharding key indices are not continuous |
| */ |
| private void check(String modelName) throws IllegalStateException { |
| for (int i = 0; i < keys.size(); i++) { |
| final ModelColumn modelColumn = keys.get(i); |
| if (modelColumn == null) { |
| throw new IllegalStateException("Sharding key index=" + i + " is missing in " + modelName); |
| } |
| } |
| } |
| } |
| } |