Add layer property to LAL script (#9593)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 70df7a6..a89c7fd 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -9,6 +9,7 @@
* Use prepareStatement in H2SQLExecutor#getByIDs.(No function change).
* Bump up snakeyaml to 1.31 for fixing CVE-2022-25857
* Fix `DurationUtils.convertToTimeBucket` missed verify date format.
+* [**Breaking Change**] Change the LAL script format(Add layer property).
#### UI
diff --git a/docs/en/concepts-and-designs/lal.md b/docs/en/concepts-and-designs/lal.md
index 1d85776..6fbb260 100644
--- a/docs/en/concepts-and-designs/lal.md
+++ b/docs/en/concepts-and-designs/lal.md
@@ -8,6 +8,9 @@
set `log-analyzer/default/lalFiles` in the `application.yml` file or set environment variable `SW_LOG_LAL_FILES` to
activate specific LAL config files.
+## Layer
+Layer should be declared in the LAL script to represent the analysis scope of the logs.
+
## Filter
A filter is a group of [parser](#parser), [extractor](#extractor) and [sink](#sink). Users can use one or more filters
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
index 4bb1126..c5a0fed 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
@@ -37,9 +37,9 @@
import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.YamlParserSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.SinkSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
-import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListenerFactory;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordSinkListener;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficSinkListener;
import org.apache.skywalking.oap.server.core.source.Log;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -49,7 +49,7 @@
public class FilterSpec extends AbstractSpec {
private static final Logger LOGGER = LoggerFactory.getLogger(FilterSpec.class);
- private final List<LogAnalysisListenerFactory> factories;
+ private final List<LogSinkListenerFactory> sinkListenerFactories;
private final TextParserSpec textParser;
@@ -70,9 +70,9 @@
parsedType = new TypeReference<Map<String, Object>>() {
};
- factories = Arrays.asList(
- new RecordAnalysisListener.Factory(moduleManager(), moduleConfig()),
- new TrafficAnalysisListener.Factory(moduleManager(), moduleConfig())
+ sinkListenerFactories = Arrays.asList(
+ new RecordSinkListener.Factory(moduleManager(), moduleConfig()),
+ new TrafficSinkListener.Factory(moduleManager(), moduleConfig())
);
textParser = new TextParserSpec(moduleManager(), moduleConfig());
@@ -168,17 +168,17 @@
final Optional<AtomicReference<Log>> container = BINDING.get().logContainer();
if (container.isPresent()) {
- factories.stream()
- .map(LogAnalysisListenerFactory::create)
- .filter(it -> it instanceof RecordAnalysisListener)
+ sinkListenerFactories.stream()
+ .map(LogSinkListenerFactory::create)
+ .filter(it -> it instanceof RecordSinkListener)
.map(it -> it.parse(logData, extraLog))
- .map(it -> (RecordAnalysisListener) it)
- .map(RecordAnalysisListener::getLog)
+ .map(it -> (RecordSinkListener) it)
+ .map(RecordSinkListener::getLog)
.findFirst()
.ifPresent(log -> container.get().set(log));
} else {
- factories.stream()
- .map(LogAnalysisListenerFactory::create)
+ sinkListenerFactories.stream()
+ .map(LogSinkListenerFactory::create)
.forEach(it -> it.parse(logData, extraLog).build());
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
index 89c331f..8232338 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LALConfig.java
@@ -25,4 +25,6 @@
private String name;
private String dsl;
+
+ private String layer;
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
index 3b11355..88456db 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/LogAnalyzerModuleConfig.java
@@ -51,7 +51,7 @@
private List<Rule> meterConfigs;
public List<String> lalFiles() {
- return Splitter.on(",").omitEmptyStrings().splitToList(Strings.nullToEmpty(getLalFiles()));
+ return Splitter.on(",").omitEmptyStrings().trimResults().splitToList(Strings.nullToEmpty(getLalFiles()));
}
public List<Rule> malConfigs() throws ModuleStartException {
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/ILogAnalysisListenerManager.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/ILogAnalysisListenerManager.java
index 0fe3247..2b8b03e 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/ILogAnalysisListenerManager.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/ILogAnalysisListenerManager.java
@@ -19,10 +19,15 @@
import java.util.List;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListenerFactory;
public interface ILogAnalysisListenerManager {
void addListenerFactory(LogAnalysisListenerFactory factory);
List<LogAnalysisListenerFactory> getLogAnalysisListenerFactories();
+
+ void addSinkListenerFactory(LogSinkListenerFactory factory);
+
+ List<LogSinkListenerFactory> getSinkListenerFactory();
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
index 966d125..73909b1 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzer.java
@@ -20,9 +20,13 @@
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListener;
@@ -46,26 +50,41 @@
log.debug("The log is ignored because the Service name is empty");
return;
}
- createListeners();
+ Layer layer;
+ if ("".equals(builder.getLayer())) {
+ layer = Layer.GENERAL;
+ } else {
+ try {
+ layer = Layer.nameOf(builder.getLayer());
+ } catch (UnexpectedException e) {
+ log.warn("The Layer {} is not found, abandon the log.", builder.getLayer());
+ return;
+ }
+ }
+
+ createAnalysisListeners(layer);
if (builder.getTimestamp() == 0) {
// If no timestamp, OAP server would use the received timestamp as log's timestamp
builder.setTimestamp(System.currentTimeMillis());
}
- notifyListener(builder, extraLog);
- notifyListenerToBuild();
+ notifyAnalysisListener(builder, extraLog);
+ notifyAnalysisListenerToBuild();
}
- private void notifyListener(LogData.Builder builder, final Message extraLog) {
+ private void notifyAnalysisListener(LogData.Builder builder, final Message extraLog) {
listeners.forEach(listener -> listener.parse(builder, extraLog));
}
- private void notifyListenerToBuild() {
+ private void notifyAnalysisListenerToBuild() {
listeners.forEach(LogAnalysisListener::build);
}
- private void createListeners() {
+ private void createAnalysisListeners(Layer layer) {
factoryManager.getLogAnalysisListenerFactories()
- .forEach(factory -> listeners.add(factory.create()));
+ .stream()
+ .map(factory -> factory.create(layer))
+ .filter(Objects::nonNull)
+ .forEach(listeners::add);
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzerServiceImpl.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzerServiceImpl.java
index dbe5004..d5dbcd1 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzerServiceImpl.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/LogAnalyzerServiceImpl.java
@@ -24,13 +24,15 @@
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
+import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListenerFactory;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@RequiredArgsConstructor
public class LogAnalyzerServiceImpl implements ILogAnalyzerService, ILogAnalysisListenerManager {
private final ModuleManager moduleManager;
private final LogAnalyzerModuleConfig moduleConfig;
- private final List<LogAnalysisListenerFactory> factories = new ArrayList<>();
+ private final List<LogAnalysisListenerFactory> analysisListenerFactories = new ArrayList<>();
+ private final List<LogSinkListenerFactory> sinkListenerFactories = new ArrayList<>();
@Override
public void doAnalysis(final LogData.Builder log, Message extraLog) {
@@ -40,11 +42,21 @@
@Override
public void addListenerFactory(final LogAnalysisListenerFactory factory) {
- factories.add(factory);
+ analysisListenerFactories.add(factory);
}
@Override
public List<LogAnalysisListenerFactory> getLogAnalysisListenerFactories() {
- return factories;
+ return analysisListenerFactories;
+ }
+
+ @Override
+ public void addSinkListenerFactory(LogSinkListenerFactory factory) {
+ sinkListenerFactories.add(factory);
+ }
+
+ @Override
+ public List<LogSinkListenerFactory> getSinkListenerFactory() {
+ return sinkListenerFactories;
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
index e5273e4..f43c3a6 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListener.java
@@ -25,8 +25,7 @@
*/
public interface LogAnalysisListener {
/**
- * The last step of the analysis process. Typically, the implementations forward the analysis results to the source
- * receiver.
+ * The last step of the analysis process. Typically, the implementations execute corresponding DSL.
*/
void build();
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
index 78d1e9f..8955adf 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogAnalysisListenerFactory.java
@@ -17,11 +17,13 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+
/**
* LogAnalysisListenerFactory implementation creates the listener instance when required.
* Every LogAnalysisListener could have its own creation factory.
*/
public interface LogAnalysisListenerFactory {
- LogAnalysisListener create();
+ LogAnalysisListener create(Layer layer);
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
index 27a5ec3..28bdd09 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java
@@ -19,61 +19,74 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
import org.apache.skywalking.oap.log.analyzer.dsl.DSL;
import org.apache.skywalking.oap.log.analyzer.provider.LALConfig;
import org.apache.skywalking.oap.log.analyzer.provider.LALConfigs;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+
+import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@Slf4j
@RequiredArgsConstructor
public class LogFilterListener implements LogAnalysisListener {
- private final List<DSL> dsls;
+ @lombok.NonNull
+ private final DSL dsl;
@Override
public void build() {
- dsls.forEach(dsl -> {
- try {
- dsl.evaluate();
- } catch (final Exception e) {
- log.warn("Failed to evaluate dsl: {}", dsl, e);
- }
- });
+ try {
+ dsl.evaluate();
+ } catch (final Exception e) {
+ log.warn("Failed to evaluate dsl: {}", dsl, e);
+ }
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData,
final Message extraLog) {
- dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())
- .extraLog(extraLog)));
+ dsl.bind(new Binding().log(logData.build()).extraLog(extraLog));
return this;
}
public static class Factory implements LogAnalysisListenerFactory {
- private final List<DSL> dsls;
+ private final Map<Layer, DSL> dsls;
public Factory(final ModuleManager moduleManager, final LogAnalyzerModuleConfig config) throws Exception {
- dsls = new ArrayList<>();
+ dsls = new HashMap<>();
final List<LALConfig> configList = LALConfigs.load(config.getLalPath(), config.lalFiles())
.stream()
.flatMap(it -> it.getRules().stream())
.collect(Collectors.toList());
for (final LALConfig c : configList) {
- dsls.add(DSL.of(moduleManager, config, c.getDsl()));
+ Layer layer = Layer.nameOf(c.getLayer());
+ if (dsls.put(layer, DSL.of(moduleManager, config, c.getDsl())) != null) {
+ throw new ModuleStartException("Layer " + layer.name() + " has already set a rule.");
+ }
}
}
@Override
- public LogAnalysisListener create() {
- return new LogFilterListener(dsls);
+ public LogAnalysisListener create(Layer layer) {
+ if (layer == null) {
+ return null;
+ }
+ final DSL dsl = dsls.get(layer);
+ if (dsl == null) {
+ return null;
+ }
+ return new LogFilterListener(dsl);
}
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogSinkListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogSinkListener.java
new file mode 100644
index 0000000..0a763c0
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogSinkListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+
+import com.google.protobuf.Message;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+
+public interface LogSinkListener {
+ /**
+ * The last step of the sink process. Typically, the implementations forward the results to the source
+ * receiver.
+ */
+ void build();
+
+ /**
+ * Parse the raw data from the probe.
+ * @return {@code this} for chaining.
+ */
+ LogSinkListener parse(LogData.Builder logData, final Message extraLog);
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogSinkListenerFactory.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogSinkListenerFactory.java
new file mode 100644
index 0000000..571bb84
--- /dev/null
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogSinkListenerFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
+
+/**
+ * LogSinkListenerFactory implementation creates the listener instance when required.
+ * Every LogSinkListener could have its own creation factory.
+ */
+public interface LogSinkListenerFactory {
+ LogSinkListener create();
+}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordSinkListener.java
similarity index 93%
rename from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
rename to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordSinkListener.java
index b378b20..8a866ef 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/RecordSinkListener.java
@@ -29,6 +29,7 @@
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
+
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@@ -48,10 +49,10 @@
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
- * RecordAnalysisListener forwards the log data to the persistence layer with the query required conditions.
+ * RecordSinkListener forwards the log data to the persistence layer with the query required conditions.
*/
@RequiredArgsConstructor
-public class RecordAnalysisListener implements LogAnalysisListener {
+public class RecordSinkListener implements LogSinkListener {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
@@ -66,7 +67,7 @@
@Override
@SneakyThrows
- public LogAnalysisListener parse(final LogData.Builder logData,
+ public LogSinkListener parse(final LogData.Builder logData,
final Message extraLog) {
LogDataBody body = logData.getBody();
log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
@@ -142,7 +143,7 @@
});
}
- public static class Factory implements LogAnalysisListenerFactory {
+ public static class Factory implements LogSinkListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
@@ -161,8 +162,8 @@
}
@Override
- public LogAnalysisListener create() {
- return new RecordAnalysisListener(sourceReceiver, namingControl, searchableTagKeys);
+ public RecordSinkListener create() {
+ return new RecordSinkListener(sourceReceiver, namingControl, searchableTagKeys);
}
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficSinkListener.java
similarity index 93%
rename from oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
rename to oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficSinkListener.java
index c84f3ba..1e7c9e3 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficAnalysisListener.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/TrafficSinkListener.java
@@ -40,7 +40,7 @@
* Generate service, service instance and endpoint traffic by log data.
*/
@RequiredArgsConstructor
-public class TrafficAnalysisListener implements LogAnalysisListener {
+public class TrafficSinkListener implements LogSinkListener {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
@@ -62,7 +62,7 @@
}
@Override
- public LogAnalysisListener parse(final LogData.Builder logData,
+ public LogSinkListener parse(final LogData.Builder logData,
final Message extraLog) {
Layer layer;
if (StringUtil.isNotEmpty(logData.getLayer())) {
@@ -97,7 +97,7 @@
return this;
}
- public static class Factory implements LogAnalysisListenerFactory {
+ public static class Factory implements LogSinkListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
@@ -111,8 +111,8 @@
}
@Override
- public LogAnalysisListener create() {
- return new TrafficAnalysisListener(sourceReceiver, namingControl);
+ public LogSinkListener create() {
+ return new TrafficSinkListener(sourceReceiver, namingControl);
}
}
}
diff --git a/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLSecurityTest.java b/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLSecurityTest.java
index 2f0cb26..51efffe 100644
--- a/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLSecurityTest.java
+++ b/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLSecurityTest.java
@@ -131,7 +131,7 @@
public void testSecurity() throws ModuleStartException {
final DSL dsl = DSL.of(manager, new LogAnalyzerModuleConfig(), script);
Whitebox.setInternalState(
- Whitebox.getInternalState(dsl, "filterSpec"), "factories", Collections.emptyList()
+ Whitebox.getInternalState(dsl, "filterSpec"), "sinkListenerFactories", Collections.emptyList()
);
dsl.bind(new Binding().log(LogData.newBuilder()));
diff --git a/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java b/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
index 71dcbc8..6a5a733 100644
--- a/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
+++ b/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
@@ -191,7 +191,7 @@
public void testDslStaticCompile() throws ModuleStartException {
final DSL dsl = DSL.of(manager, new LogAnalyzerModuleConfig(), script);
Whitebox.setInternalState(
- Whitebox.getInternalState(dsl, "filterSpec"), "factories", Collections.emptyList()
+ Whitebox.getInternalState(dsl, "filterSpec"), "sinkListenerFactories", Collections.emptyList()
);
dsl.bind(new Binding().log(LogData.newBuilder().build()));
diff --git a/oap-server/server-starter/src/main/resources/lal/default.yaml b/oap-server/server-starter/src/main/resources/lal/default.yaml
index fb18825..12317a9 100644
--- a/oap-server/server-starter/src/main/resources/lal/default.yaml
+++ b/oap-server/server-starter/src/main/resources/lal/default.yaml
@@ -16,6 +16,7 @@
# The default LAL script to save all logs, behaving like the versions before 8.5.0.
rules:
- name: default
+ layer: GENERAL
dsl: |
filter {
sink {
diff --git a/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml b/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml
index 2da0292..cd326a1 100644
--- a/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml
+++ b/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml
@@ -15,6 +15,7 @@
rules:
- name: envoy-als
+ layer: MESH
dsl: |
filter {
// only collect abnormal logs (http status code >= 300, or commonProperties?.responseFlags is not empty)
diff --git a/test/e2e-v2/cases/log/lal.yaml b/test/e2e-v2/cases/log/lal.yaml
index bfd7a61..4d98c93 100644
--- a/test/e2e-v2/cases/log/lal.yaml
+++ b/test/e2e-v2/cases/log/lal.yaml
@@ -15,6 +15,7 @@
rules:
- name: example
+ layer: GENERAL
dsl: |
filter {
text {
diff --git a/test/e2e-v2/cases/satellite/native-protocols/lal.yaml b/test/e2e-v2/cases/satellite/native-protocols/lal.yaml
index aa0922d..e94fb2a 100644
--- a/test/e2e-v2/cases/satellite/native-protocols/lal.yaml
+++ b/test/e2e-v2/cases/satellite/native-protocols/lal.yaml
@@ -15,6 +15,7 @@
rules:
- name: example
+ layer: GENERAL
dsl: |
filter {
text {