blob: 2d85c2ea5dac76afd4e3218fbcbba70b0a554c18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.library.util.StringUtil;
/**
* StorageModels manages all models detected by the core.
*/
@Slf4j
public class StorageModels implements IModelManager, ModelCreator, ModelManipulator {
private final List<Model> models;
private final HashMap<String, String> columnNameOverrideRule;
private final List<CreatingListener> listeners;
public StorageModels() {
this.models = new ArrayList<>();
this.columnNameOverrideRule = new HashMap<>();
this.listeners = new ArrayList<>();
}
@Override
public Model add(Class<?> aClass, int scopeId, Storage storage, boolean record) throws StorageException {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId);
List<ModelColumn> modelColumns = new ArrayList<>();
ShardingKeyChecker checker = new ShardingKeyChecker();
SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
checker.check(storage.getModelName());
Model model = new Model(
storage.getModelName(),
modelColumns,
scopeId,
storage.getDownsampling(),
record,
isSuperDatasetModel(aClass),
aClass,
storage.isTimeRelativeID(),
sqlDBModelExtension
);
this.followColumnNameRules(model);
models.add(model);
for (final CreatingListener listener : listeners) {
listener.whenCreating(model);
}
return model;
}
private boolean isSuperDatasetModel(Class<?> aClass) {
return aClass.isAnnotationPresent(SuperDataset.class);
}
/**
* CreatingListener listener could react when {@link #add(Class, int, Storage, boolean)} model happens. Also, the
* added models are being notified in this add operation.
*/
@Override
public void addModelListener(final CreatingListener listener) throws StorageException {
listeners.add(listener);
for (Model model : models) {
listener.whenCreating(model);
}
}
/**
* Read model column metadata based on the class level definition.
*/
private void retrieval(final Class<?> clazz,
final String modelName,
final List<ModelColumn> modelColumns,
final int scopeId,
ShardingKeyChecker checker,
final SQLDatabaseModelExtension sqlDBModelExtension,
boolean record) {
if (log.isDebugEnabled()) {
log.debug("Analysis {} to generate Model.", clazz.getName());
}
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(Column.class)) {
if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
if (!record) {
throw new IllegalStateException("Model [" + modelName + "] is not a Record, @SQLDatabase.AdditionalEntity only supports Record.");
}
}
Column column = field.getAnnotation(Column.class);
// Use the column#length as the default column length, as read the system env as the override mechanism.
// Log the error but don't block the startup sequence.
int columnLength = column.length();
final String lengthEnvVariable = column.lengthEnvVariable();
if (StringUtil.isNotEmpty(lengthEnvVariable)) {
final String envValue = System.getenv(lengthEnvVariable);
if (StringUtil.isNotEmpty(envValue)) {
try {
columnLength = Integer.parseInt(envValue);
} catch (NumberFormatException e) {
log.error("Model [{}] Column [{}], illegal value {} of column length from system env [{}]",
modelName, column.columnName(), envValue, lengthEnvVariable
);
}
}
}
// SQL Database extension
SQLDatabaseExtension sqlDatabaseExtension = new SQLDatabaseExtension();
List<SQLDatabase.QueryUnifiedIndex> indexDefinitions = new ArrayList<>();
if (field.isAnnotationPresent(SQLDatabase.QueryUnifiedIndex.class)) {
indexDefinitions.add(field.getAnnotation(SQLDatabase.QueryUnifiedIndex.class));
}
if (field.isAnnotationPresent(SQLDatabase.MultipleQueryUnifiedIndex.class)) {
Collections.addAll(
indexDefinitions, field.getAnnotation(SQLDatabase.MultipleQueryUnifiedIndex.class).value());
}
indexDefinitions.forEach(indexDefinition -> {
sqlDatabaseExtension.appendIndex(new SQLDatabaseExtension.MultiColumnsIndex(
column.columnName(),
indexDefinition.withColumns()
));
});
// ElasticSearch extension
final ElasticSearch.MatchQuery elasticSearchAnalyzer = field.getAnnotation(
ElasticSearch.MatchQuery.class);
final ElasticSearch.Column elasticSearchColumn = field.getAnnotation(ElasticSearch.Column.class);
ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension(
elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer(),
elasticSearchColumn == null ? null : elasticSearchColumn.columnAlias()
);
// BanyanDB extension
final BanyanDB.ShardingKey banyanDBShardingKey = field.getAnnotation(
BanyanDB.ShardingKey.class);
final BanyanDB.GlobalIndex banyanDBGlobalIndex = field.getAnnotation(
BanyanDB.GlobalIndex.class);
final BanyanDB.NoIndexing banyanDBNoIndex = field.getAnnotation(
BanyanDB.NoIndexing.class);
BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
banyanDBShardingKey == null ? -1 : banyanDBShardingKey.index(),
banyanDBGlobalIndex == null ? false : true,
banyanDBNoIndex != null ? false : column.storageOnly()
);
final ModelColumn modelColumn = new ModelColumn(
new ColumnName(
modelName,
column.columnName()
),
field.getType(),
field.getGenericType(),
column.storageOnly(),
column.indexOnly(),
column.dataType().isValue(),
columnLength,
sqlDatabaseExtension,
elasticSearchExtension,
banyanDBExtension
);
if (banyanDBExtension.isShardingKey()) {
checker.accept(modelName, modelColumn);
}
if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
String[] tableNames = field.getAnnotation(SQLDatabase.AdditionalEntity.class).additionalTables();
for (final String tableName : tableNames) {
sqlDBModelExtension.appendAdditionalTable(tableName, modelColumn);
}
if (!field.getAnnotation(SQLDatabase.AdditionalEntity.class).reserveOriginalColumns()) {
sqlDBModelExtension.appendExcludeColumns(modelColumn);
}
}
modelColumns.add(modelColumn);
if (log.isDebugEnabled()) {
log.debug("The field named [{}] with the [{}] type", column.columnName(), field.getType());
}
if (column.dataType().isValue()) {
ValueColumnMetadata.INSTANCE.putIfAbsent(
modelName, column.columnName(), column.dataType(), column.function(),
column.defaultValue(), scopeId
);
}
}
}
if (Objects.nonNull(clazz.getSuperclass())) {
retrieval(clazz.getSuperclass(), modelName, modelColumns, scopeId, checker, sqlDBModelExtension, record);
}
}
@Override
public void overrideColumnName(String columnName, String newName) {
columnNameOverrideRule.put(columnName, newName);
models.forEach(this::followColumnNameRules);
ValueColumnMetadata.INSTANCE.overrideColumnName(columnName, newName);
}
private void followColumnNameRules(Model model) {
columnNameOverrideRule.forEach((oldName, newName) -> {
model.getColumns().forEach(column -> {
column.getColumnName().overrideName(oldName, newName);
column.getSqlDatabaseExtension()
.getIndices()
.forEach(extraQueryIndex -> extraQueryIndex.overrideName(oldName, newName));
});
});
}
@Override
public List<Model> allModels() {
return models;
}
private class ShardingKeyChecker {
private ArrayList<ModelColumn> keys = new ArrayList<>();
/**
* @throws IllegalStateException if sharding key indices are conflicting.
*/
private void accept(String modelName, ModelColumn modelColumn) throws IllegalStateException {
final int idx = modelColumn.getBanyanDBExtension().getShardingKeyIdx();
while (idx + 1 > keys.size()) {
keys.add(null);
}
ModelColumn exist = keys.get(idx);
if (exist != null) {
throw new IllegalStateException(
modelName + "'s "
+ "Column [" + exist.getColumnName() + "] and column [" + modelColumn.getColumnName()
+ " are conflicting with sharding key index=" + modelColumn.getBanyanDBExtension()
.getShardingKeyIdx());
}
keys.set(idx, modelColumn);
}
/**
* @param modelName model name of the entity
* @throws IllegalStateException if sharding key indices are not continuous
*/
private void check(String modelName) throws IllegalStateException {
for (int i = 0; i < keys.size(); i++) {
final ModelColumn modelColumn = keys.get(i);
if (modelColumn == null) {
throw new IllegalStateException("Sharding key index=" + i + " is missing in " + modelName);
}
}
}
}
}