blob: 51aadd4c1be2cb06f9f6c66e59392c4f69003868 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.storage.model;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* StorageModels manages all models detected by the core.
*/
@Slf4j
public class StorageModels implements IModelManager, ModelCreator, ModelManipulator {
private final List<Model> models;
private final HashMap<String, String> columnNameOverrideRule;
private final List<CreatingListener> listeners;
public StorageModels() {
this.models = new ArrayList<>();
this.columnNameOverrideRule = new HashMap<>();
this.listeners = new ArrayList<>();
}
@Override
public Model add(Class<?> aClass, int scopeId, Storage storage) throws StorageException {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId);
List<ModelColumn> modelColumns = new ArrayList<>();
ShardingKeyChecker checker = new ShardingKeyChecker();
SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
ElasticSearchModelExtension elasticSearchModelExtension = new ElasticSearchModelExtension();
retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, banyanDBModelExtension);
// Add extra column for additional entities
if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
|| aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
Map<String/*parent column*/, List<String>/*tables*/> extraColumns = new HashMap<>();
if (aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
for (SQLDatabase.ExtraColumn4AdditionalEntity extraColumn : aClass.getAnnotation(
SQLDatabase.MultipleExtraColumn4AdditionalEntity.class).value()) {
List<String> tables = extraColumns.computeIfAbsent(
extraColumn.parentColumn(), v -> new ArrayList<>());
tables.add(extraColumn.additionalTable());
}
} else {
SQLDatabase.ExtraColumn4AdditionalEntity extraColumn = aClass.getAnnotation(
SQLDatabase.ExtraColumn4AdditionalEntity.class);
List<String> tables = extraColumns.computeIfAbsent(extraColumn.parentColumn(), v -> new ArrayList<>());
tables.add(extraColumn.additionalTable());
}
extraColumns.forEach((extraColumn, tables) -> {
if (!addExtraColumn4AdditionalEntity(sqlDBModelExtension, modelColumns, extraColumn, tables)) {
throw new IllegalStateException(
"Model [" + storage.getModelName() + "] defined an extra column [" + extraColumn + "] by @SQLDatabase.ExtraColumn4AdditionalEntity, " +
"but couldn't be found from the parent.");
}
});
}
//Add timestampColumn for BanyanDB
if (aClass.isAnnotationPresent(BanyanDB.TimestampColumn.class)) {
String timestampColumn = aClass.getAnnotation(BanyanDB.TimestampColumn.class).value();
banyanDBModelExtension.setTimestampColumn(timestampColumn);
}
if (aClass.isAnnotationPresent(BanyanDB.StoreIDAsTag.class)) {
banyanDBModelExtension.setStoreIDTag(true);
}
// Set routing rules for ElasticSearch
elasticSearchModelExtension.setRouting(storage.getModelName(), modelColumns);
checker.check(storage.getModelName());
Model model = new Model(
storage.getModelName(),
modelColumns,
scopeId,
storage.getDownsampling(),
isSuperDatasetModel(aClass),
aClass,
storage.isTimeRelativeID(),
sqlDBModelExtension,
banyanDBModelExtension,
elasticSearchModelExtension
);
this.followColumnNameRules(model);
models.add(model);
for (final CreatingListener listener : listeners) {
listener.whenCreating(model);
}
return model;
}
private boolean isSuperDatasetModel(Class<?> aClass) {
return aClass.isAnnotationPresent(SuperDataset.class);
}
/**
* CreatingListener listener could react when {@link ModelCreator#add(Class, int, Storage)} model happens. Also, the
* added models are being notified in this add operation.
*/
@Override
public void addModelListener(final CreatingListener listener) throws StorageException {
listeners.add(listener);
for (Model model : models) {
listener.whenCreating(model);
}
}
/**
* Read model column metadata based on the class level definition.
*/
private void retrieval(final Class<?> clazz,
final String modelName,
final List<ModelColumn> modelColumns,
final int scopeId,
ShardingKeyChecker checker,
final SQLDatabaseModelExtension sqlDBModelExtension,
final BanyanDBModelExtension banyanDBModelExtension) {
if (log.isDebugEnabled()) {
log.debug("Analysis {} to generate Model.", clazz.getName());
}
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(Column.class)) {
if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
if (!Record.class.isAssignableFrom(clazz)) {
throw new IllegalStateException(
"Model [" + modelName + "] is not a Record, @SQLDatabase.AdditionalEntity only supports Record.");
}
}
Column column = field.getAnnotation(Column.class);
// Use the column#length as the default column length, as read the system env as the override mechanism.
// Log the error but don't block the startup sequence.
int columnLength = column.length();
// SQL Database extension
SQLDatabaseExtension sqlDatabaseExtension = new SQLDatabaseExtension();
List<SQLDatabase.CompositeIndex> indexDefinitions = new ArrayList<>();
if (field.isAnnotationPresent(SQLDatabase.CompositeIndex.class)) {
indexDefinitions.add(field.getAnnotation(SQLDatabase.CompositeIndex.class));
}
if (field.isAnnotationPresent(SQLDatabase.CompositeIndices.class)) {
Collections.addAll(
indexDefinitions, field.getAnnotation(SQLDatabase.CompositeIndices.class).value());
}
indexDefinitions.forEach(indexDefinition -> {
sqlDatabaseExtension.appendIndex(new SQLDatabaseExtension.MultiColumnsIndex(
column.name(),
indexDefinition.withColumns()
));
});
// ElasticSearch extension
final ElasticSearch.MatchQuery elasticSearchAnalyzer = field.getAnnotation(
ElasticSearch.MatchQuery.class);
final ElasticSearch.Column elasticSearchColumn = field.getAnnotation(ElasticSearch.Column.class);
final ElasticSearch.Keyword keywordColumn = field.getAnnotation(ElasticSearch.Keyword.class);
final ElasticSearch.Routing routingColumn = field.getAnnotation(ElasticSearch.Routing.class);
ElasticSearchExtension elasticSearchExtension = new ElasticSearchExtension(
elasticSearchAnalyzer == null ? null : elasticSearchAnalyzer.analyzer(),
elasticSearchColumn == null ? null : elasticSearchColumn.legacyName(),
keywordColumn != null,
routingColumn != null
);
// BanyanDB extension
final BanyanDB.SeriesID banyanDBSeriesID = field.getAnnotation(
BanyanDB.SeriesID.class);
final BanyanDB.NoIndexing banyanDBNoIndex = field.getAnnotation(
BanyanDB.NoIndexing.class);
final BanyanDB.IndexRule banyanDBIndexRule = field.getAnnotation(
BanyanDB.IndexRule.class);
final BanyanDB.MeasureField banyanDBMeasureField = field.getAnnotation(
BanyanDB.MeasureField.class);
final BanyanDB.TopNAggregation topNAggregation = field.getAnnotation(
BanyanDB.TopNAggregation.class);
BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
banyanDBSeriesID == null ? -1 : banyanDBSeriesID.index(),
banyanDBNoIndex == null && !column.storageOnly(),
banyanDBIndexRule == null ? BanyanDB.IndexRule.IndexType.INVERTED : banyanDBIndexRule.indexType(),
banyanDBMeasureField != null
);
if (topNAggregation != null) {
BanyanDBModelExtension.TopN topN = new BanyanDBModelExtension.TopN();
topN.setLruSize(topNAggregation.lruSize());
topN.setCountersNumber(topNAggregation.countersNumber());
topN.setGroupByTagNames(Collections.singletonList(column.name()));
banyanDBModelExtension.setTopN(topN);
}
final ModelColumn modelColumn = new ModelColumn(
new ColumnName(column),
field.getType(),
field.getGenericType(),
column.storageOnly(),
column.indexOnly(),
column.dataType().isValue(),
columnLength,
sqlDatabaseExtension,
elasticSearchExtension,
banyanDBExtension
);
if (banyanDBExtension.isShardingKey()) {
checker.accept(modelName, modelColumn);
}
if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
final var additionalEntity = field.getAnnotation(SQLDatabase.AdditionalEntity.class);
final var additionalTableNames = additionalEntity.additionalTables();
for (final var tableName : additionalTableNames) {
sqlDBModelExtension.appendAdditionalTable(tableName, modelColumn);
}
if (!additionalEntity.reserveOriginalColumns()) {
sqlDBModelExtension.appendExcludeColumns(modelColumn);
}
}
modelColumns.add(modelColumn);
if (log.isDebugEnabled()) {
log.debug("The field named [{}] with the [{}] type", column.name(), field.getType());
}
if (column.dataType().isValue()) {
ValueColumnMetadata.INSTANCE.putIfAbsent(
modelName, column.name(),
column.dataType(), column.defaultValue(), scopeId, column.multiIntValues());
}
}
}
if (Objects.nonNull(clazz.getSuperclass())) {
retrieval(clazz.getSuperclass(), modelName, modelColumns, scopeId, checker, sqlDBModelExtension, banyanDBModelExtension);
}
}
@Override
public void overrideColumnName(String columnName, String newName) {
columnNameOverrideRule.put(columnName, newName);
models.forEach(this::followColumnNameRules);
ValueColumnMetadata.INSTANCE.overrideColumnName(columnName, newName);
}
private void followColumnNameRules(Model model) {
columnNameOverrideRule.forEach((oldName, newName) -> {
model.getColumns().forEach(column -> {
log.debug("Override model column name: [{}] {} -> {}.", model.getName(), oldName, newName);
column.getColumnName().overrideName(oldName, newName);
column.getSqlDatabaseExtension()
.getIndices()
.forEach(extraQueryIndex -> extraQueryIndex.overrideName(oldName, newName));
});
});
}
private boolean addExtraColumn4AdditionalEntity(SQLDatabaseModelExtension sqlDBModelExtension,
List<ModelColumn> modelColumns,
String extraColumn, List<String> additionalTables) {
for (ModelColumn modelColumn : modelColumns) {
if (modelColumn.getColumnName().getName().equals(extraColumn)) {
additionalTables.forEach(tableName -> {
sqlDBModelExtension.appendAdditionalTable(tableName, modelColumn);
});
return true;
}
}
return false;
}
@Override
public List<Model> allModels() {
return models;
}
private static class ShardingKeyChecker {
private final ArrayList<ModelColumn> keys = new ArrayList<>();
/**
* @throws IllegalStateException if sharding key indices are conflicting.
*/
private void accept(String modelName, ModelColumn modelColumn) throws IllegalStateException {
final int idx = modelColumn.getBanyanDBExtension().getShardingKeyIdx();
while (idx + 1 > keys.size()) {
keys.add(null);
}
ModelColumn exist = keys.get(idx);
if (exist != null) {
throw new IllegalStateException(
modelName + "'s "
+ "Column [" + exist.getColumnName() + "] and column [" + modelColumn.getColumnName()
+ " are conflicting with sharding key index=" + modelColumn.getBanyanDBExtension()
.getShardingKeyIdx());
}
keys.set(idx, modelColumn);
}
/**
* @param modelName model name of the entity
* @throws IllegalStateException if sharding key indices are not continuous
*/
private void check(String modelName) throws IllegalStateException {
for (int i = 0; i < keys.size(); i++) {
final ModelColumn modelColumn = keys.get(i);
if (modelColumn == null) {
throw new IllegalStateException("Sharding key index=" + i + " is missing in " + modelName);
}
}
}
}
}