feat(#3353): Support sign and encrypt security modes in OPC-UA adapteā¦ (#3354)
* feat(#3353): Support sign and encrypt security modes in OPC-UA adapter and sink
* Fix test
* Fix dependency convergence
* Modify OPC e2e tests
* Improve completed static property validation
* Minor bug fixes
* Fix test
* Add additional environment variables
diff --git a/pom.xml b/pom.xml
index c1285c0..8f0d8ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
<amqp-client.version>5.21.0</amqp-client.version>
<apache-sis-referencing.version>1.2</apache-sis-referencing.version>
<boofcv.version>1.1.0</boofcv.version>
+ <bcprov-jdk18on.version>1.78.1</bcprov-jdk18on.version>
<classindex.version>3.9</classindex.version>
<checker-qual.version>3.43.0</checker-qual.version>
<commons-codec.version>1.17.0</commons-codec.version>
@@ -54,7 +55,7 @@
<commons-pool2.version>2.12.0</commons-pool2.version>
<commons-text.version>1.12.0</commons-text.version>
<ditto-client.version>1.0.0</ditto-client.version>
- <eclipse.milo.version>0.6.9</eclipse.milo.version>
+ <eclipse.milo.version>0.6.14</eclipse.milo.version>
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
@@ -399,6 +400,16 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ <version>${bcprov-jdk18on.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcutil-jdk18on</artifactId>
+ <version>${bcprov-jdk18on.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-rio-turtle</artifactId>
<version>${rdf4j.version}</version>
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7ad56fe..7a2d033 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -108,7 +108,18 @@
// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
- SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", "");
+ SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", ""),
+
+ // OPC-UA security
+ SP_OPCUA_SECURITY_DIR("SP_OPCUA_SECURITY_DIR", "/streampipes-security/opcua"),
+ SP_OPCUA_KEYSTORE_FILE("SP_OPCUA_KEYSTORE_FILE", "keystore.pfx"),
+ SP_OPCUA_KEYSTORE_PASSWORD("SP_OPCUA_KEYSTORE_PASSWORD", "password"),
+ SP_OPCUA_KEYSTORE_TYPE("SP_OPCUA_KEYSTORE_TYPE", "PKCS12"),
+ SP_OPCUA_KEYSTORE_ALIAS("SP_OPCUA_KEYSTORE_ALIAS", "apache-streampipes"),
+ SP_OPCUA_APPLICATION_URI(
+ "SP_OPCUA_APPLICATION_URI",
+ "urn:org:apache:streampipes:opcua:client"
+ );
private final String envVariableName;
private String defaultValue;
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 36ed402..bb5bbac 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -328,4 +328,33 @@
return new StringEnvironmentVariable(Envs.SP_ALLOWED_UPLOAD_FILETYPES);
}
+ @Override
+ public StringEnvironmentVariable getOpcUaSecurityDir() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_SECURITY_DIR);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaKeystoreFile() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_FILE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaKeystorePassword() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaApplicationUri() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_APPLICATION_URI);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOPcUaKeystoreType() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaKeystoreAlias() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
+ }
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index d1d6efa..cb441f0 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -36,7 +36,9 @@
IntEnvironmentVariable getServicePort();
StringEnvironmentVariable getSpCoreScheme();
+
StringEnvironmentVariable getSpCoreHost();
+
IntEnvironmentVariable getSpCorePort();
// Time series storage env variables
@@ -144,12 +146,15 @@
// Broker defaults
StringEnvironmentVariable getKafkaHost();
+
IntEnvironmentVariable getKafkaPort();
StringEnvironmentVariable getMqttHost();
+
IntEnvironmentVariable getMqttPort();
StringEnvironmentVariable getNatsHost();
+
IntEnvironmentVariable getNatsPort();
StringEnvironmentVariable getPulsarUrl();
@@ -158,4 +163,15 @@
StringEnvironmentVariable getAllowedUploadFiletypes();
+ StringEnvironmentVariable getOpcUaSecurityDir();
+
+ StringEnvironmentVariable getOpcUaKeystoreFile();
+
+ StringEnvironmentVariable getOpcUaKeystorePassword();
+
+ StringEnvironmentVariable getOpcUaApplicationUri();
+
+ StringEnvironmentVariable getOPcUaKeystoreType();
+
+ StringEnvironmentVariable getOpcUaKeystoreAlias();
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/pom.xml b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
index 71eaf09..673e685 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
@@ -21,9 +21,9 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-parent</artifactId>
+ <artifactId>streampipes-extensions</artifactId>
<version>0.97.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
+ <relativePath>../pom.xml</relativePath>
</parent>
<artifactId>streampipes-connectors-opcua</artifactId>
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
index 63aacda..03fc844 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
@@ -23,25 +23,35 @@
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
+import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
+import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
import java.util.List;
public class OpcUaConnectorsModuleExport implements IExtensionModuleExport {
+
+ private final OpcUaClientProvider clientProvider;
+
+ public OpcUaConnectorsModuleExport() {
+ this.clientProvider = new OpcUaClientProvider();
+ }
+
@Override
public List<StreamPipesAdapter> adapters() {
return List.of(
- new OpcUaAdapter()
+ new OpcUaAdapter(clientProvider)
);
}
@Override
public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
- new OpcUaSink()
+ new OpcUaSink(clientProvider)
);
}
@@ -50,7 +60,9 @@
return List.of(
new OpcUaAdapterMigrationV1(),
new OpcUaAdapterMigrationV2(),
- new OpcUaAdapterMigrationV3()
+ new OpcUaAdapterMigrationV3(),
+ new OpcUaAdapterMigrationV4(),
+ new OpcUaSinkMigrationV1()
);
}
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index baf5cfe..526ff7b 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -29,7 +29,8 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
@@ -42,7 +43,6 @@
import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.staticproperty.StaticProperty;
-import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -66,7 +66,6 @@
import java.util.stream.Collectors;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
@@ -78,7 +77,9 @@
private static final Logger LOG = LoggerFactory.getLogger(OpcUaAdapter.class);
private int pullingIntervalMilliSeconds;
- private SpOpcUaClient<OpcUaAdapterConfig> spOpcUaClient;
+ private final OpcUaClientProvider clientProvider;
+ private ConnectedOpcUaClient connectedClient;
+ private OpcUaAdapterConfig opcUaAdapterConfig;
private List<OpcNode> allNodes;
private List<NodeId> allNodeIds;
private int numberProperties;
@@ -92,15 +93,14 @@
*/
private final Map<String, String> nodeIdToLabelMapping;
- public OpcUaAdapter() {
- super();
+ public OpcUaAdapter(OpcUaClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
this.numberProperties = 0;
this.event = new HashMap<>();
this.nodeIdToLabelMapping = new HashMap<>();
}
private void prepareAdapter(IAdapterParameterExtractor extractor) throws AdapterException {
-
this.allNodeIds = new ArrayList<>();
List<String> deleteKeys = extractor
.getAdapterDescription()
@@ -111,9 +111,9 @@
.collect(Collectors.toList());
try {
- this.spOpcUaClient.connect();
+ this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
OpcUaNodeBrowser browserClient =
- new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
+ new OpcUaNodeBrowser(this.connectedClient.getClient(), this.opcUaAdapterConfig);
this.allNodes = browserClient.findNodes(deleteKeys);
@@ -121,11 +121,11 @@
this.allNodeIds.add(node.getNodeId());
}
- if (spOpcUaClient.getSpOpcConfig().inPullMode()) {
- this.pullingIntervalMilliSeconds = spOpcUaClient.getSpOpcConfig().getPullIntervalMilliSeconds();
+ if (opcUaAdapterConfig.inPullMode()) {
+ this.pullingIntervalMilliSeconds = opcUaAdapterConfig.getPullIntervalMilliSeconds();
} else {
this.numberProperties = this.allNodeIds.size();
- this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
+ this.connectedClient.createListSubscription(this.allNodeIds, this);
}
this.allNodes.forEach(node -> this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
@@ -139,7 +139,7 @@
@Override
public void pullData() throws ExecutionException, RuntimeException, InterruptedException, TimeoutException {
var response =
- this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
+ this.connectedClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
boolean badStatusCodeReceived = false;
boolean emptyValueReceived = false;
List<DataValue> returnValues =
@@ -168,7 +168,7 @@
private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
return badStatusCodeReceived
- && this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
+ && this.opcUaAdapterConfig.getIncompleteEventStrategy()
.equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
}
@@ -208,13 +208,13 @@
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
- this.spOpcUaClient = new SpOpcUaClient<>(
- SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor())
- );
+ this.opcUaAdapterConfig =
+ SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
+ //this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
this.collector = collector;
this.prepareAdapter(extractor);
- if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+ if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(this, extractor.getAdapterDescription().getElementId());
}
@@ -223,9 +223,9 @@
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
- this.spOpcUaClient.disconnect();
+ clientProvider.releaseClient(this.opcUaAdapterConfig);
- if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+ if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler.shutdown();
}
}
@@ -233,12 +233,12 @@
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor) throws SpConfigurationException {
- return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+ return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, extractor);
}
@Override
public IAdapterConfiguration declareConfig() {
- var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
+ var builder = AdapterConfigurationBuilder.create(ID, 4, () -> new OpcUaAdapter(clientProvider))
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
@@ -255,6 +255,6 @@
@Override
public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException {
- return getSchema(extractor);
+ return getSchema(clientProvider, extractor);
}
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
index e7dde9d..2a150fa 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
@@ -25,6 +25,7 @@
import org.eclipse.milo.opcua.sdk.client.AddressSpace;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
@@ -45,13 +46,13 @@
public class OpcUaNodeBrowser {
- private final OpcUaClient client;
+ private final UaClient client;
private final OpcUaConfig spOpcConfig;
private static final Logger LOG = LoggerFactory.getLogger(OpcUaNodeBrowser.class);
public OpcUaNodeBrowser(
- OpcUaClient client,
+ UaClient client,
OpcUaConfig spOpcUaClientConfig
) {
this.client = client;
@@ -127,7 +128,7 @@
}
private List<TreeInputNode> findChildren(
- OpcUaClient client,
+ UaClient client,
NodeId nodeId
) throws UaException {
return client
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
index 04dda6e..bcaf11a 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.extensions.connectors.opcua.adapter;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
@@ -32,12 +32,12 @@
import java.util.concurrent.ExecutionException;
public class OpcUaNodeMetadataExtractor {
- private final OpcUaClient client;
+ private final UaClient client;
private final UaNode node;
private final Map<String, Object> metadata;
- public OpcUaNodeMetadataExtractor(OpcUaClient client, UaNode node) {
+ public OpcUaNodeMetadataExtractor(UaClient client, UaNode node) {
this.client = client;
this.node = node;
this.metadata = new HashMap<>();
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
new file mode 100644
index 0000000..d71d856
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.client;
+
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ConnectedOpcUaClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectedOpcUaClient.class);
+ private final UaClient client;
+ private static final AtomicLong clientHandles = new AtomicLong(1L);
+
+ public ConnectedOpcUaClient(UaClient client) {
+ this.client = client;
+ }
+
+ /***
+ * Register subscriptions for given OPC UA nodes
+ * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
+ * @param opcUaAdapter current instance of {@link OpcUaAdapter}
+ * @throws Exception
+ */
+ public void createListSubscription(List<NodeId> nodes,
+ OpcUaAdapter opcUaAdapter) throws Exception {
+ client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
+ @Override
+ public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
+ LOG.warn("Transfer for subscriptionId={} failed: {}", subscription.getSubscriptionId(), statusCode);
+ try {
+ initSubscription(nodes, opcUaAdapter);
+ } catch (Exception e) {
+ LOG.error("Re-creating the subscription failed", e);
+ }
+ }
+ });
+
+ initSubscription(nodes, opcUaAdapter);
+ }
+
+
+ public void initSubscription(List<NodeId> nodes,
+ OpcUaAdapter opcUaAdapter) throws Exception {
+ /*
+ * create a subscription @ 1000ms
+ */
+ UaSubscription subscription = this.client.getSubscriptionManager().createSubscription(1000.0).get();
+
+ List<CompletableFuture<DataValue>> values = new ArrayList<>();
+
+ for (NodeId node : nodes) {
+ values.add(this.client.readValue(0, TimestampsToReturn.Both, node));
+ }
+
+ for (CompletableFuture<DataValue> value : values) {
+ if (value.get().getValue().toString().contains("null")) {
+ LOG.error("Node has no value");
+ }
+ }
+
+
+ List<ReadValueId> readValues = new ArrayList<>();
+ // Read a specific value attribute
+ for (NodeId node : nodes) {
+ readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE));
+ }
+
+ List<MonitoredItemCreateRequest> requests = new ArrayList<>();
+
+ for (ReadValueId readValue : readValues) {
+ // important: client handle must be unique per item
+ UInteger clientHandle = uint(clientHandles.getAndIncrement());
+
+ MonitoringParameters parameters = new MonitoringParameters(
+ clientHandle,
+ 1000.0, // sampling interval
+ null, // filter, null means use default
+ uint(10), // queue size
+ true // discard oldest
+ );
+
+ requests.add(new MonitoredItemCreateRequest(readValue, MonitoringMode.Reporting, parameters));
+ }
+
+ UaSubscription.ItemCreationCallback onItemCreated =
+ (item, i) -> item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
+ List<UaMonitoredItem> items = subscription.createMonitoredItems(
+ TimestampsToReturn.Both,
+ requests,
+ onItemCreated
+ ).get();
+
+ for (UaMonitoredItem item : items) {
+ NodeId tagId = item.getReadValueId().getNodeId();
+ if (item.getStatusCode().isGood()) {
+ LOG.info("item created for nodeId=" + tagId);
+ } else {
+ LOG.error("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
+ }
+ }
+ }
+
+ /***
+ *
+ * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
+ */
+ public UaClient getClient() {
+ return this.client;
+ }
+
+ public void disconnect() {
+ client.disconnect();
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
new file mode 100644
index 0000000..b3f907b
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.client;
+
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+public class OpcUaClientProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpcUaClientProvider.class);
+
+ private final Map<String, ConnectedOpcUaClient> clients = new ConcurrentHashMap<>();
+ private final Map<String, Integer> consumers = new ConcurrentHashMap<>();
+
+ public synchronized <T extends OpcUaConfig> ConnectedOpcUaClient getClient(T config)
+ throws UaException, SpConfigurationException, URISyntaxException, ExecutionException, InterruptedException {
+ var serverId = config.getUniqueServerId();
+ if (clients.containsKey(serverId)) {
+ LOG.debug("Adding new consumer to client {}", serverId);
+ consumers.put(serverId, consumers.get(config.getUniqueServerId()) + 1);
+ return clients.get(serverId);
+ } else {
+ LOG.debug("Creating new client {}", serverId);
+ var connectedClient = new SpOpcUaClient<>(config).connect();
+ clients.put(serverId, connectedClient);
+ consumers.put(serverId, 1);
+ return connectedClient;
+ }
+ }
+
+ public <T extends OpcUaConfig> void releaseClient(T config) {
+ String serverId = config.getUniqueServerId();
+ LOG.debug("Releasing client {}", serverId);
+
+ synchronized (this) {
+ consumers.computeIfPresent(serverId, (key, count) -> {
+ int updatedCount = count - 1;
+ if (updatedCount <= 0) {
+ LOG.debug("Disconnecting client {}", serverId);
+ if (clients.containsKey(serverId)) {
+ clients.get(serverId).disconnect();
+ clients.remove(serverId);
+ }
+ return null;
+ }
+ return updatedCount;
+ });
+ }
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
index 95fda81..a17d373 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
@@ -20,38 +20,17 @@
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
-import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
-import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
-import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
-import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
/***
* Wrapper class for all OPC UA specific stuff.
@@ -63,120 +42,26 @@
private OpcUaClient client;
private final T spOpcConfig;
- private static final AtomicLong clientHandles = new AtomicLong(1L);
-
public SpOpcUaClient(T config) {
this.spOpcConfig = config;
}
/***
- *
- * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
- */
- public OpcUaClient getClient() {
- return this.client;
- }
-
- /***
* Establishes appropriate connection to OPC UA endpoint depending on the {@link SpOpcUaClient} instance
*
* @throws UaException An exception occurring during OPC connection
*/
- public void connect()
+ public ConnectedOpcUaClient connect()
throws UaException, ExecutionException, InterruptedException, SpConfigurationException, URISyntaxException {
OpcUaClientConfig clientConfig = new MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
- this.client = OpcUaClient.create(clientConfig);
+ var client = OpcUaClient.create(clientConfig);
client.connect().get();
- }
-
- public void disconnect() {
- client.disconnect();
- }
-
- /***
- * Register subscriptions for given OPC UA nodes
- * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
- * @param opcUaAdapter current instance of {@link OpcUaAdapter}
- * @throws Exception
- */
- public void createListSubscription(List<NodeId> nodes,
- OpcUaAdapter opcUaAdapter) throws Exception {
- client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
- @Override
- public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
- LOG.warn("Transfer for subscriptionId={} failed: {}", subscription.getSubscriptionId(), statusCode);
- try {
- initSubscription(nodes, opcUaAdapter);
- } catch (Exception e) {
- LOG.error("Re-creating the subscription failed", e);
- }
- }
- });
-
- initSubscription(nodes, opcUaAdapter);
+ return new ConnectedOpcUaClient(client);
}
- public void initSubscription(List<NodeId> nodes,
- OpcUaAdapter opcUaAdapter) throws Exception {
- /*
- * create a subscription @ 1000ms
- */
- UaSubscription subscription = this.client.getSubscriptionManager().createSubscription(1000.0).get();
-
- List<CompletableFuture<DataValue>> values = new ArrayList<>();
-
- for (NodeId node : nodes) {
- values.add(this.client.readValue(0, TimestampsToReturn.Both, node));
- }
-
- for (CompletableFuture<DataValue> value : values) {
- if (value.get().getValue().toString().contains("null")) {
- LOG.error("Node has no value");
- }
- }
- List<ReadValueId> readValues = new ArrayList<>();
- // Read a specific value attribute
- for (NodeId node : nodes) {
- readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE));
- }
-
- List<MonitoredItemCreateRequest> requests = new ArrayList<>();
-
- for (ReadValueId readValue : readValues) {
- // important: client handle must be unique per item
- UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
- MonitoringParameters parameters = new MonitoringParameters(
- clientHandle,
- 1000.0, // sampling interval
- null, // filter, null means use default
- uint(10), // queue size
- true // discard oldest
- );
-
- requests.add(new MonitoredItemCreateRequest(readValue, MonitoringMode.Reporting, parameters));
- }
-
- UaSubscription.ItemCreationCallback onItemCreated =
- (item, i) -> item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
- List<UaMonitoredItem> items = subscription.createMonitoredItems(
- TimestampsToReturn.Both,
- requests,
- onItemCreated
- ).get();
-
- for (UaMonitoredItem item : items) {
- NodeId tagId = item.getReadValueId().getNodeId();
- if (item.getStatusCode().isGood()) {
- LOG.info("item created for nodeId=" + tagId);
- } else {
- LOG.error("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
- }
- }
- }
public T getSpOpcConfig() {
return spOpcConfig;
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
index 9813d52..8db41a8 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
@@ -18,19 +18,15 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
-import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
-import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -39,65 +35,16 @@
public OpcUaClientConfig makeClientConfig(OpcUaConfig spOpcConfig)
throws ExecutionException, InterruptedException, SpConfigurationException, URISyntaxException {
String opcServerUrl = spOpcConfig.getOpcServerURL();
+ String applicationUri = Environments.getEnvironment().getOpcUaApplicationUri().getValueOrDefault();
List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(opcServerUrl).get();
- String host = opcServerUrl.split("://")[1].split(":")[0];
- EndpointDescription tmpEndpoint = endpoints
- .stream()
- .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
- .findFirst()
- .orElseThrow(() -> new SpConfigurationException("No endpoint with security policy none"));
+ var builder = OpcUaClientConfig.builder()
+ .setApplicationName(LocalizedText.english("Apache StreamPipes"))
+ .setApplicationUri(applicationUri);
- tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
- endpoints = Collections.singletonList(tmpEndpoint);
+ spOpcConfig.getSecurityConfig().configureSecurityPolicy(opcServerUrl, endpoints, builder);
+ spOpcConfig.getIdentityConfig().configureIdentity(builder);
- EndpointDescription endpoint = endpoints
- .stream()
- .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
- .findFirst().orElseThrow(() -> new SpConfigurationException("no desired endpoints returned"));
-
- return buildConfig(endpoint, spOpcConfig);
- }
-
- private OpcUaClientConfig buildConfig(EndpointDescription endpoint,
- OpcUaConfig spOpcConfig) {
-
- OpcUaClientConfigBuilder builder = defaultBuilder(endpoint);
- if (!spOpcConfig.isUnauthenticated()) {
- builder.setIdentityProvider(new UsernameProvider(spOpcConfig.getUsername(), spOpcConfig.getPassword()));
- }
return builder.build();
}
-
- private OpcUaClientConfigBuilder defaultBuilder(EndpointDescription endpoint) {
- return OpcUaClientConfig.builder()
- .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
- .setApplicationUri("urn:eclipse:milo:examples:client")
- .setEndpoint(endpoint);
- }
-
- private EndpointDescription updateEndpointUrl(
- EndpointDescription original, String hostname) throws URISyntaxException {
-
- URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
-
- String endpointUrl = String.format(
- "%s://%s:%s%s",
- uri.getScheme(),
- hostname,
- uri.getPort(),
- uri.getPath()
- );
-
- return new EndpointDescription(
- endpointUrl,
- original.getServer(),
- original.getServerCertificate(),
- original.getSecurityMode(),
- original.getSecurityPolicyUri(),
- original.getUserIdentityTokens(),
- original.getTransportProfileUri(),
- original.getSecurityLevel()
- );
- }
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
index 6db027d..128f1bc 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
@@ -18,15 +18,21 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import org.apache.streampipes.extensions.connectors.opcua.config.identity.IdentityConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
+
import java.util.List;
public class OpcUaConfig {
private String opcServerURL;
- private boolean unauthenticated;
- private String username;
- private String password;
private List<String> selectedNodeNames;
+ private IdentityConfig identityConfig;
+ private SecurityConfig securityPolicyConfig;
+
+ public OpcUaConfig() {
+
+ }
public String getOpcServerURL() {
return opcServerURL;
@@ -36,30 +42,6 @@
this.opcServerURL = opcServerURL;
}
- public boolean isUnauthenticated() {
- return unauthenticated;
- }
-
- public void setUnauthenticated(boolean unauthenticated) {
- this.unauthenticated = unauthenticated;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
public List<String> getSelectedNodeNames() {
return selectedNodeNames;
}
@@ -67,4 +49,24 @@
public void setSelectedNodeNames(List<String> selectedNodeNames) {
this.selectedNodeNames = selectedNodeNames;
}
+
+ public IdentityConfig getIdentityConfig() {
+ return identityConfig;
+ }
+
+ public void setIdentityConfig(IdentityConfig identityConfig) {
+ this.identityConfig = identityConfig;
+ }
+
+ public SecurityConfig getSecurityConfig() {
+ return securityPolicyConfig;
+ }
+
+ public void setSecurityConfig(SecurityConfig securityPolicyConfig) {
+ this.securityPolicyConfig = securityPolicyConfig;
+ }
+
+ public String getUniqueServerId() {
+ return String.format("%s-%s-%s", opcServerURL, securityPolicyConfig, identityConfig);
+ }
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
index 9b5f541..1059ce7 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
@@ -29,7 +30,6 @@
import java.util.List;
import static org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter.PULL_GROUP;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.HOST_PORT;
@@ -41,7 +41,6 @@
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_URL;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
@@ -51,14 +50,27 @@
public static final String INCOMPLETE_OPTION_IGNORE = "ignore-event";
public static final String INCOMPLETE_OPTION_SEND = "send-event";
+ public static final String SECURITY_MODE = "securityMode";
+ public static final String SECURITY_POLICY = "securityPolicy";
+ public static final String USER_AUTHENTICATION = "userAuthentication";
+ public static final String USER_AUTHENTICATION_ANONYMOUS = "anonymous";
+
public static void appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder<?, ?> builder,
boolean adapterConfig) {
var dependsOn = getDependsOn(adapterConfig);
builder
- .requiredAlternatives(Labels.withId(ACCESS_MODE),
- Alternatives.from(Labels.withId(UNAUTHENTICATED)),
+ .requiredSingleValueSelection(
+ Labels.withId(SECURITY_MODE),
+ SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new Option(mode.k, mode.v)).toList()
+ )
+ .requiredSingleValueSelection(
+ Labels.withId(SECURITY_POLICY),
+ SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new Option(p.name())).toList()
+ )
+ .requiredAlternatives(Labels.withId(USER_AUTHENTICATION),
+ Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
Alternatives.from(Labels.withId(USERNAME_GROUP),
StaticProperties.group(
Labels.withId(USERNAME_GROUP),
@@ -104,7 +116,7 @@
public static OneOfStaticProperty getIncompleteEventConfig() {
return StaticProperties.singleValueSelection(
- Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
+ Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
List.of(
new Option("Ignore (only complete messages are sent)", INCOMPLETE_OPTION_IGNORE),
new Option("Send (incomplete messages are sent)", INCOMPLETE_OPTION_SEND)
@@ -115,10 +127,12 @@
public static List<String> getDependsOn(boolean adapterConfig) {
return adapterConfig ? List.of(
ADAPTER_TYPE.name(),
- ACCESS_MODE.name(),
+ SECURITY_MODE,
+ SECURITY_POLICY,
OPC_HOST_OR_URL.name()
) : List.of(
- ACCESS_MODE.name(),
+ SECURITY_MODE,
+ SECURITY_POLICY,
OPC_HOST_OR_URL.name());
}
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 6741a3e..333dbad 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -20,11 +20,16 @@
import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.connectors.opcua.config.identity.AnonymousIdentityConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.identity.UsernamePasswordIdentityConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
import java.util.List;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_HOST_OR_URL;
@@ -35,7 +40,6 @@
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
public class SpOpcUaConfigExtractor {
@@ -71,20 +75,31 @@
}
public static <T extends OpcUaConfig> T extractSharedConfig(IParameterExtractor extractor,
- T config) {
+ T config) {
String selectedAlternativeConnection =
extractor.selectedAlternativeInternalId(OPC_HOST_OR_URL.name());
+
String selectedAlternativeAuthentication =
- extractor.selectedAlternativeInternalId(ACCESS_MODE.name());
+ extractor.selectedAlternativeInternalId(SharedUserConfiguration.USER_AUTHENTICATION);
+
List<String> selectedNodeNames =
extractor.selectedTreeNodesInternalNames(AVAILABLE_NODES.name(), String.class);
-
config.setSelectedNodeNames(selectedNodeNames);
- boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
- boolean unauthenticated = selectedAlternativeAuthentication.equals(UNAUTHENTICATED.name());
+ String selectedSecurityMode = extractor.selectedSingleValueInternalName(
+ SharedUserConfiguration.SECURITY_MODE,
+ String.class
+ );
+ String selectedSecurityPolicy = extractor.selectedSingleValue(
+ SharedUserConfiguration.SECURITY_POLICY,
+ String.class
+ );
+ config.setSecurityConfig(new SecurityConfig(
+ MessageSecurityMode.valueOf(selectedSecurityMode),
+ SecurityPolicy.valueOf(selectedSecurityPolicy)));
+ boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
if (useURL) {
String serverAddress =
extractor.singleValueParameter(OPC_SERVER_URL.name(), String.class);
@@ -97,15 +112,15 @@
config.setOpcServerURL(serverAddress + ":" + port);
}
+ boolean unauthenticated = selectedAlternativeAuthentication.equals(
+ SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS
+ );
if (unauthenticated) {
- config.setUnauthenticated(true);
+ config.setIdentityConfig(new AnonymousIdentityConfig());
} else {
String username = extractor.singleValueParameter(USERNAME.name(), String.class);
String password = extractor.secretValue(PASSWORD.name());
-
- config.setUsername(username);
- config.setPassword(password);
- config.setUnauthenticated(false);
+ config.setIdentityConfig(new UsernamePasswordIdentityConfig(username, password));
}
return config;
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
new file mode 100644
index 0000000..ecaad66
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+
+public class AnonymousIdentityConfig implements IdentityConfig {
+
+ @Override
+ public void configureIdentity(OpcUaClientConfigBuilder builder) {
+ builder.setIdentityProvider(new AnonymousProvider());
+ }
+
+ @Override
+ public String toString() {
+ return "anonymous";
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
new file mode 100644
index 0000000..db51fd3
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+
+public interface IdentityConfig {
+
+ void configureIdentity(OpcUaClientConfigBuilder builder);
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
new file mode 100644
index 0000000..825fe1f
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+
+public class UsernamePasswordIdentityConfig implements IdentityConfig {
+
+ private final String username;
+ private final String password;
+
+ public UsernamePasswordIdentityConfig(String username, String password) {
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ public void configureIdentity(OpcUaClientConfigBuilder builder) {
+ builder.setIdentityProvider(new UsernameProvider(username, password));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s-%S", username, DigestUtils.sha256Hex(password));
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
new file mode 100644
index 0000000..4b132d1
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+
+public class KeyStoreLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KeyStoreLoader.class);
+
+ private X509Certificate[] clientCertificateChain;
+ private X509Certificate clientCertificate;
+ private KeyPair clientKeyPair;
+
+ public KeyStoreLoader load(Environment env,
+ Path securityDir) throws Exception {
+ var keystore = KeyStore.getInstance(env.getOPcUaKeystoreType().getValueOrDefault());
+ var keystoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
+ var keystorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
+ var keystoreAlias = env.getOpcUaKeystoreAlias().getValueOrDefault();
+ Path serverKeystore = securityDir.resolve(keystoreFile);
+ char[] serverKeyStorePassword = keystorePassword.toCharArray();
+
+ LOG.info("Loading KeyStore at {}", serverKeystore);
+
+ try (InputStream in = Files.newInputStream(serverKeystore)) {
+ keystore.load(in, serverKeyStorePassword);
+ }
+
+ Key clientPrivateKey = keystore.getKey(keystoreAlias, serverKeyStorePassword);
+ if (clientPrivateKey instanceof PrivateKey) {
+ clientCertificate = (X509Certificate) keystore.getCertificate(keystoreAlias);
+
+ clientCertificateChain = Arrays.stream(keystore.getCertificateChain(keystoreAlias))
+ .map(X509Certificate.class::cast)
+ .toArray(X509Certificate[]::new);
+
+ PublicKey serverPublicKey = clientCertificate.getPublicKey();
+ clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey);
+ }
+
+ return this;
+ }
+
+ public X509Certificate getClientCertificate() {
+ return clientCertificate;
+ }
+
+ public X509Certificate[] getClientCertificateChain() {
+ return clientCertificateChain;
+ }
+
+ public KeyPair getClientKeyPair() {
+ return clientKeyPair;
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
new file mode 100644
index 0000000..b64660e
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SecurityConfig {
+
+ private final MessageSecurityMode securityMode;
+ private final SecurityPolicy securityPolicy;
+
+ public SecurityConfig(MessageSecurityMode securityMode,
+ SecurityPolicy securityPolicy) {
+ this.securityMode = securityMode;
+ this.securityPolicy = securityPolicy;
+ }
+
+ public void configureSecurityPolicy(String opcServerUrl,
+ List<EndpointDescription> endpoints,
+ OpcUaClientConfigBuilder builder)
+ throws SpConfigurationException, URISyntaxException {
+ String host = opcServerUrl.split("://")[1].split(":")[0];
+
+ EndpointDescription tmpEndpoint = endpoints
+ .stream()
+ .filter(e -> e.getSecurityMode() == securityMode)
+ .filter(e -> e.getSecurityPolicyUri().equals(securityPolicy.getUri()))
+ .findFirst()
+ .orElseThrow(() ->
+ new SpConfigurationException("No endpoint available with security mode {} and security policy {}")
+ );
+
+ tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
+
+ if (securityMode != MessageSecurityMode.None) {
+ try {
+ var env = Environments.getEnvironment();
+ var securityDir = Paths.get(env.getOpcUaSecurityDir().getValueOrDefault());
+ var trustListManager = new DefaultTrustListManager(securityDir.resolve("pki").toFile());
+
+ var certificateValidator = new DefaultClientCertificateValidator(trustListManager);
+ var loader = new KeyStoreLoader().load(env, securityDir);
+ builder.setKeyPair(loader.getClientKeyPair());
+ builder.setCertificate(loader.getClientCertificate());
+ builder.setCertificateChain(loader.getClientCertificateChain());
+ builder.setCertificateValidator(certificateValidator);
+ } catch (Exception e) {
+ throw new SpConfigurationException(
+ "Failed to load keystore - check that all required environment variables "
+ + "are defined and the keystore exists",
+ e
+ );
+ }
+ }
+
+ builder.setEndpoint(tmpEndpoint);
+ }
+
+ private EndpointDescription updateEndpointUrl(EndpointDescription original,
+ String hostname) throws URISyntaxException {
+
+ URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
+
+ String endpointUrl = String.format("%s://%s:%s%s", uri.getScheme(), hostname, uri.getPort(), uri.getPath());
+
+ return new EndpointDescription(
+ endpointUrl,
+ original.getServer(),
+ original.getServerCertificate(),
+ original.getSecurityMode(),
+ original.getSecurityPolicyUri(),
+ original.getUserIdentityTokens(),
+ original.getTransportProfileUri(),
+ original.getSecurityLevel());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s-%s", securityMode, securityPolicy);
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
new file mode 100644
index 0000000..d4311bf
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import java.util.List;
+
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_MODE;
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_POLICY;
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION;
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS;
+import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
+import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
+import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
+
+public class OpcUaAdapterMigrationV4 implements IAdapterMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ OpcUaAdapter.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 3,
+ 4
+ );
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription element,
+ IStaticPropertyExtractor extractor) throws RuntimeException {
+ var config = element.getConfig();
+ element.setConfig(migrate(config, extractor));
+
+ return MigrationResult.success(element);
+ }
+
+ public List<StaticProperty> migrate(List<StaticProperty> staticProperties,
+ IParameterExtractor extractor) {
+ var securityMode =
+ StaticProperties.singleValueSelection(
+ Labels.withId(SECURITY_MODE),
+ SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new Option(mode.k, mode.v)).toList()
+ );
+ securityMode.getOptions().get(0).setSelected(true);
+
+ var securityPolicy = StaticProperties.singleValueSelection(
+ Labels.withId(SECURITY_POLICY),
+ SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new Option(p.name())).toList()
+ );
+ securityPolicy.getOptions().get(0).setSelected(true);
+
+ boolean anonymous = true;
+ var currentAuthSettings = extractor.selectedAlternativeInternalId(
+ "ACCESS_MODE"
+ );
+ if (currentAuthSettings.equals("USERNAME_GROUP")) {
+ anonymous = false;
+ }
+ var authentication = StaticProperties.alternatives(Labels.withId(USER_AUTHENTICATION),
+ Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
+ Alternatives.from(Labels.withId(USERNAME_GROUP),
+ StaticProperties.group(
+ Labels.withId(USERNAME_GROUP),
+ StaticProperties.stringFreeTextProperty(
+ Labels.withId(USERNAME)),
+ StaticProperties.secretValue(Labels.withId(PASSWORD))
+ ))
+ );
+ if (anonymous) {
+ authentication.getAlternatives().get(0).setSelected(true);
+ } else {
+ authentication.getAlternatives().get(1).setSelected(true);
+ var username = extractor.singleValueParameter("USERNAME", String.class);
+ var password = extractor.secretValue("PASSWORD");
+ var group = (StaticPropertyGroup) authentication.getAlternatives().get(1).getStaticProperty();
+ ((FreeTextStaticProperty) group.getStaticProperties().get(0)).setValue(username);
+ ((SecretStaticProperty) group.getStaticProperties().get(1)).setValue(password);
+ ((SecretStaticProperty) group.getStaticProperties().get(1)).setEncrypted(false);
+ }
+
+ // remove old authentication property, add new properties for securityMode, policy and authentication options
+ staticProperties.remove(1);
+ staticProperties.add(1, securityMode);
+ staticProperties.add(2, securityPolicy);
+ staticProperties.add(3, authentication);
+
+ return staticProperties;
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
new file mode 100644
index 0000000..bc24f4d
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+
+public class OpcUaSinkMigrationV1 implements IDataSinkMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ OpcUaSink.ID,
+ SpServiceTagPrefix.DATA_SINK,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element,
+ IDataSinkParameterExtractor extractor) throws RuntimeException {
+ var config = element.getStaticProperties();
+ var migratedConfigs = new OpcUaAdapterMigrationV4().migrate(config, extractor);
+ element.setStaticProperties(migratedConfigs);
+ return MigrationResult.success(element);
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
index 78babe0..265fd80 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
@@ -19,12 +19,13 @@
package org.apache.streampipes.extensions.connectors.opcua.sink;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.PrimitiveField;
import org.apache.streampipes.vocabulary.XSD;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -42,7 +43,8 @@
private static final Logger LOG = LoggerFactory.getLogger(OpcUa.class);
- private OpcUaClient opcUaClient;
+ private ConnectedOpcUaClient connectedClient;
+ private final OpcUaConfig opcUaConfig;
private OpcUaParameters params;
private NodeId node;
@@ -73,24 +75,31 @@
compatibleDataTypes.put(String.class, new Class[]{String.class});
}
- public void onInvocation(OpcUaParameters params) throws
+ private final OpcUaClientProvider clientProvider;
+
+ public OpcUa(OpcUaClientProvider clientProvider,
+ OpcUaParameters params) {
+ this.clientProvider = clientProvider;
+ this.params = params;
+ this.opcUaConfig = params.config();
+ }
+
+ public void onInvocation() throws
SpRuntimeException {
try {
- this.params = params;
- this.node = NodeId.parse(params.getSelectedNode());
- opcUaClient = new SpOpcUaClient<>(params.getConfig()).getClient();
- opcUaClient.connect().get();
+ this.node = NodeId.parse(params.selectedNode());
+ this.connectedClient = clientProvider.getClient(opcUaConfig);
} catch (Exception e) {
- throw new SpRuntimeException("Could not connect to OPC-UA server: " + params.getConfig().getOpcServerURL());
+ throw new SpRuntimeException("Could not connect to OPC-UA server: " + params.config().getOpcServerURL());
}
// check whether input data type and target data type are compatible
try {
- Variant value = opcUaClient.getAddressSpace().getVariableNode(node).readValue().getValue();
+ Variant value = this.connectedClient.getClient().getAddressSpace().getVariableNode(node).readValue().getValue();
targetDataType = value.getValue().getClass();
- sourceDataType = XSDMatchings.get(params.getMappingPropertyType());
+ sourceDataType = XSDMatchings.get(params.mappingPropertyType());
if (!sourceDataType.equals(targetDataType)) {
if (Arrays.stream(compatibleDataTypes.get(sourceDataType)).noneMatch(dt -> dt.equals(targetDataType))) {
throw new SpRuntimeException("Data Type of event of target node are not compatible");
@@ -107,40 +116,45 @@
Variant v = getValue(inputEvent);
if (v == null) {
- LOG.error("Mapping property type: " + this.params.getMappingPropertyType() + " is not supported");
+ LOG.error("Mapping property type: " + this.params.mappingPropertyType() + " is not supported");
} else {
DataValue value = new DataValue(v);
- CompletableFuture<StatusCode> f = opcUaClient.writeValue(node, value);
+ CompletableFuture<StatusCode> f = this.connectedClient.getClient().writeValue(node, value);
try {
StatusCode status = f.get();
if (status.isBad()) {
if (status.getValue() == 0x80740000L) {
- LOG.error("Type missmatch! Tried to write value of type: " + this.params.getMappingPropertyType()
+ LOG.error("Type missmatch! Tried to write value of type {} ", this.params.mappingPropertyType()
+ " but server did not accept this");
} else if (status.getValue() == 0x803B0000L) {
LOG.error("Wrong access level. Not allowed to write to nodes");
}
LOG.error(
- "Value: " + value.getValue().toString() + " could not be written to node Id: "
- + node.getIdentifier() + " on " + "OPC-UA server: " + params.getConfig().getOpcServerURL());
+ "Value: {} could not be written to node Id {} on OPC-UA server {}",
+ value.getValue().toString(),
+ node.getIdentifier(),
+ params.config().getOpcServerURL());
}
} catch (InterruptedException | ExecutionException e) {
- LOG.error("Exception: Value: " + value.getValue().toString() + " could not be written to node Id: "
- + node.getIdentifier() + " on " + "OPC-UA server: " + params.getConfig().getOpcServerURL());
+ LOG.error(
+ "Exception: Value {} could not be written to node Id {} on OPC_UA server {}",
+ value.getValue().toString(),
+ node.getIdentifier(),
+ params.config().getOpcServerURL());
}
}
}
public void onDetach() throws SpRuntimeException {
- opcUaClient.disconnect();
+ clientProvider.releaseClient(opcUaConfig);
}
private Variant getValue(Event inputEvent) {
Variant result = null;
PrimitiveField propertyPrimitive =
- inputEvent.getFieldBySelector(this.params.getMappingPropertySelector()).getAsPrimitive();
+ inputEvent.getFieldBySelector(this.params.mappingPropertySelector()).getAsPrimitive();
if (targetDataType.equals(Integer.class)) {
result = new Variant(propertyPrimitive.getAsInt());
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
index 15906af..438bf15 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
@@ -20,36 +20,8 @@
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
-public final class OpcUaParameters {
- private final String selectedNode;
- private final String mappingPropertySelector;
- private final String mappingPropertyType;
-
- private final OpcUaConfig config;
-
- public OpcUaParameters(OpcUaConfig config,
- String mappingPropertySelector,
- String mappingPropertyType,
- String selectedNode) {
- this.config = config;
- this.mappingPropertySelector = mappingPropertySelector;
- this.mappingPropertyType = mappingPropertyType;
- this.selectedNode = selectedNode;
- }
-
- public String getSelectedNode() {
- return selectedNode;
- }
-
- public String getMappingPropertySelector() {
- return mappingPropertySelector;
- }
-
- public String getMappingPropertyType() {
- return mappingPropertyType;
- }
-
- public OpcUaConfig getConfig() {
- return config;
- }
+public record OpcUaParameters(OpcUaConfig config,
+ String mappingPropertySelector,
+ String mappingPropertyType,
+ String selectedNode) {
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
index 50a6069..b81f20d 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
@@ -26,6 +26,7 @@
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
@@ -45,11 +46,18 @@
public class OpcUaSink implements IStreamPipesDataSink, SupportsRuntimeConfig {
+ public static final String ID = "org.apache.streampipes.sinks.databases.jvm.opcua";
+
private OpcUa opcUa;
+ private final OpcUaClientProvider clientProvider;
+
+ public OpcUaSink(OpcUaClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
+ }
@Override
public IDataSinkConfiguration declareConfig() {
- var builder = DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua", 0)
+ var builder = DataSinkBuilder.create(ID, 0)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.category(DataSinkType.FORWARD)
@@ -62,7 +70,7 @@
SharedUserConfiguration.appendSharedOpcUaConfig(builder, false);
return DataSinkConfiguration.create(
- OpcUaSink::new,
+ () -> new OpcUaSink(clientProvider),
builder.build()
);
}
@@ -89,8 +97,8 @@
config.getSelectedNodeNames().get(0)
);
- this.opcUa = new OpcUa();
- this.opcUa.onInvocation(params);
+ this.opcUa = new OpcUa(clientProvider, params);
+ this.opcUa.onInvocation();
}
@Override
@@ -106,6 +114,6 @@
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor) throws SpConfigurationException {
- return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+ return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, extractor);
}
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
index aa49a7a..8cd7640 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
@@ -25,8 +25,9 @@
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaNodeBrowser;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
@@ -37,7 +38,7 @@
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
@@ -78,7 +79,8 @@
* @throws AdapterException
* @throws ParseException
*/
- public static GuessSchema getSchema(IAdapterParameterExtractor extractor)
+ public static GuessSchema getSchema(OpcUaClientProvider clientProvider,
+ IAdapterParameterExtractor extractor)
throws AdapterException, ParseException {
var builder = GuessSchemaBuilder.create();
EventSchema eventSchema = new EventSchema();
@@ -86,14 +88,13 @@
Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
List<EventProperty> allProperties = new ArrayList<>();
- SpOpcUaClient<OpcUaConfig> spOpcUaClient = new SpOpcUaClient<>(
- SpOpcUaConfigExtractor.extractSharedConfig(extractor.getStaticPropertyExtractor(), new OpcUaConfig())
+ var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(
+ extractor.getStaticPropertyExtractor(), new OpcUaConfig()
);
-
try {
- spOpcUaClient.connect();
+ var connectedClient = clientProvider.getClient(opcUaConfig);
OpcUaNodeBrowser nodeBrowser =
- new OpcUaNodeBrowser(spOpcUaClient.getClient(), spOpcUaClient.getSpOpcConfig());
+ new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
List<OpcNode> selectedNodes = nodeBrowser.findNodes();
if (!selectedNodes.isEmpty()) {
@@ -116,12 +117,12 @@
var nodeIds = selectedNodes.stream()
.map(OpcNode::getNodeId)
.collect(Collectors.toList());
- var response = spOpcUaClient.getClient()
+ var response = connectedClient.getClient()
.readValues(0, TimestampsToReturn.Both, nodeIds);
var returnValues = response.get();
- spOpcUaClient.disconnect();
+ //clientProvider.releaseClient(opcUaConfig);
makeEventPreview(selectedNodes, eventPreview, fieldStatusInfos, returnValues);
@@ -129,7 +130,9 @@
} catch (Exception e) {
throw new AdapterException("Could not guess schema for opc node: " + e.getMessage(), e);
} finally {
- spOpcUaClient.disconnect();
+ // TODO
+ //spOpcUaClient.disconnect();
+ clientProvider.releaseClient(opcUaConfig);
}
eventSchema.setEventProperties(allProperties);
@@ -172,7 +175,8 @@
* @param parameterExtractor to extract parameters from the OPC UA config
* @return {@code List<Option>} with available node names for the given OPC UA configuration
*/
- public static RuntimeResolvableTreeInputStaticProperty resolveConfig(String internalName,
+ public static RuntimeResolvableTreeInputStaticProperty resolveConfig(OpcUaClientProvider clientProvider,
+ String internalName,
IStaticPropertyExtractor parameterExtractor)
throws SpConfigurationException {
@@ -181,19 +185,18 @@
// access mode and host/url have to be selected
try {
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
- parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
+ parameterExtractor.selectedSingleValueInternalName(SharedUserConfiguration.SECURITY_MODE, String.class);
+ parameterExtractor.selectedSingleValue(SharedUserConfiguration.SECURITY_POLICY, String.class);
} catch (NullPointerException nullPointerException) {
return config;
}
- SpOpcUaClient spOpcUaClient = new SpOpcUaClient(
- SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new OpcUaConfig())
- );
+ var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new OpcUaConfig());
try {
- spOpcUaClient.connect();
+ var connectedClient = clientProvider.getClient(opcUaConfig);
OpcUaNodeBrowser nodeBrowser =
- new OpcUaNodeBrowser(spOpcUaClient.getClient(), spOpcUaClient.getSpOpcConfig());
+ new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
var nodes = nodeBrowser.buildNodeTreeFromOrigin(config.getNextBaseNodeToResolve());
if (Objects.isNull(config.getNextBaseNodeToResolve())) {
@@ -204,7 +207,7 @@
if (!config.getSelectedNodesInternalNames().isEmpty()) {
config.setSelectedNodesInternalNames(
- filterMissingNodes(spOpcUaClient.getClient(), config.getSelectedNodesInternalNames())
+ filterMissingNodes(connectedClient.getClient(), config.getSelectedNodesInternalNames())
);
}
@@ -215,13 +218,15 @@
} catch (ExecutionException | InterruptedException | URISyntaxException e) {
throw new SpConfigurationException("Could not connect to the OPC UA server with the provided settings", e);
} finally {
- if (spOpcUaClient.getClient() != null) {
- spOpcUaClient.disconnect();
- }
+ clientProvider.releaseClient(opcUaConfig);
+ // TODO
+// if (spOpcUaClient.getClient() != null) {
+// spOpcUaClient.disconnect();
+// }
}
}
- public static List<String> filterMissingNodes(OpcUaClient opcUaClient,
+ public static List<String> filterMissingNodes(UaClient opcUaClient,
List<String> selectedNodes) {
return selectedNodes.stream().filter(selectedNode -> {
try {
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
new file mode 100644
index 0000000..576f3ee
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.utils;
+
+import org.apache.streampipes.model.Tuple2;
+
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
+import java.util.List;
+
+public class SecurityUtils {
+
+ public static List<Tuple2<String, String>> getAvailableSecurityModes() {
+ return List.of(
+ new Tuple2<>("None", MessageSecurityMode.None.name()),
+ new Tuple2<>("Sign", MessageSecurityMode.Sign.name()),
+ new Tuple2<>("Sign & Encrypt", MessageSecurityMode.SignAndEncrypt.name())
+ );
+ }
+
+ public static List<SecurityPolicy> getAvailableSecurityPolicies() {
+ return List.of(
+ SecurityPolicy.None,
+ SecurityPolicy.Basic128Rsa15,
+ SecurityPolicy.Basic256,
+ SecurityPolicy.Basic256Sha256,
+ SecurityPolicy.Aes128_Sha256_RsaOaep,
+ SecurityPolicy.Aes256_Sha256_RsaPss
+ );
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
index 079243a..185776e 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
@@ -26,7 +26,40 @@
## Description
-Reads values from an OPC-UA server repeatedly
+This adapter reads node values from an OPC-UA server.
+The adapter supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode,
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+āā pki/
+ā āā issuers/
+ā āā rejected/
+ā āā trusted/
+ā ā āā certs/
+ā ā āā crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
***
@@ -40,7 +73,15 @@
Duration of the polling interval in seconds
-### Anonymous vs. Username/Password
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
Choose whether you want to connect anonymously or authenticate using your credentials.
@@ -54,14 +95,6 @@
**URL**: Specify the server's full `URL` (including port), can be with our without leading `opc.tcp://`<br/>
**Host/Port**: Insert the `host` address (with or without leading `opc.tcp://`) and the `port`<br/>
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both valid.
-
### Available Nodes
Shows all available nodes once namespace index and node ID are given.
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
index 995045e..15f54c5 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
@@ -38,10 +38,22 @@
OPC_SERVER_PORT.title=Port
OPC_SERVER_PORT.description=Example: 4840
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
+
ACCESS_MODE.title=Security Level
ACCESS_MODE.description=Select the OPC UA security level for the connection
-USERNAME_GROUP.title=Sign (username & password)
+USERNAME_GROUP.title=Username & Password
USERNAME_GROUP.description=
UNAUTHENTICATED.title=None
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
index 2722146..ee28512 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
@@ -26,7 +26,40 @@
## Description
-Allows to write events to an OPC-UA server.
+This data sink can be used to write values to an OPC-UA server.
+The sink supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode,
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+āā pki/
+ā āā issuers/
+ā āā rejected/
+ā āā trusted/
+ā ā āā certs/
+ā ā āā crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
***
@@ -46,6 +79,21 @@
The port of the OPC-UA server.
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
+
+Choose whether you want to connect anonymously or authenticate using your credentials.
+
+ **Anonymous**: No further information required <br/>
+ **Username/Password**: Insert your `username` and `password` to access the OPC UA server
+
### Namespace Index
The namespace index in which the node should be written
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
index 7e78d16..b26c09c 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
@@ -56,3 +56,15 @@
MAPPING_PROPERY.title=Field to write
MAPPING_PROPERY.description=The field that should be written to the OPC-UA server
+
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java
index 4dd462b..6d82bcf 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.extensions.connectors.opcua.migration.config;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -52,7 +53,7 @@
public class OpcUaAdapterVersionedConfig {
public static AdapterDescription getOpcUaAdapterDescriptionV1(){
- var builder = AdapterConfigurationBuilder.create(ID, 1, OpcUaAdapter::new)
+ var builder = AdapterConfigurationBuilder.create(ID, 1, () -> new OpcUaAdapter(new OpcUaClientProvider()))
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
diff --git a/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml b/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml
index b7730e2..d2aabf0 100644
--- a/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml
@@ -198,6 +198,14 @@
<artifactId>classindex</artifactId>
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcutil-jdk18on</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</dependency>
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
index 88cede6..e487c1a 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
@@ -229,6 +229,10 @@
<artifactId>classindex</artifactId>
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</dependency>
diff --git a/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml b/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml
index b616216..5e93d4ea 100644
--- a/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml
@@ -163,6 +163,10 @@
<artifactId>classindex</artifactId>
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</dependency>
diff --git a/ui/cypress/support/utils/connect/OpcUaUtils.ts b/ui/cypress/support/utils/connect/OpcUaUtils.ts
index 066cacf..31d3f7f 100644
--- a/ui/cypress/support/utils/connect/OpcUaUtils.ts
+++ b/ui/cypress/support/utils/connect/OpcUaUtils.ts
@@ -89,7 +89,8 @@
}
builder
- .addInput('radio', 'access_mode-none', '')
+ .addInput('radio', 'securitymode-none', '')
+ .addInput('radio', 'userauthentication-anonymous', '')
.addInput('radio', 'opc_host_or_url-url', '')
.addInput(
'input',
diff --git a/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.spec.ts b/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.smoke.spec.ts
similarity index 97%
rename from ui/cypress/tests/connect/opcua/opcAdapterConfiguration.spec.ts
rename to ui/cypress/tests/connect/opcua/opcAdapterConfiguration.smoke.spec.ts
index 96d2656..fc5aa64 100644
--- a/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.spec.ts
+++ b/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.smoke.spec.ts
@@ -160,7 +160,8 @@
'undefined-pull-mode-group-0-PULLING_INTERVAL-0',
'1000',
)
- .addInput('radio', 'access_mode-none', '')
+ .addInput('radio', 'securitymode-none', '')
+ .addInput('radio', 'userauthentication-anonymous', '')
.addInput('radio', 'opc_host_or_url-url', '')
.addInput(
'input',
diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.html b/ui/src/app/connect/components/configuration-group/configuration-group.component.html
index 68569ad..282912a 100644
--- a/ui/src/app/connect/components/configuration-group/configuration-group.component.html
+++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.html
@@ -31,8 +31,10 @@
[adapterId]="adapterId"
[parentForm]="configurationGroup"
[fieldName]="config.internalName"
- (updateEmitter)="triggerUpdate($event)"
- [completedStaticProperty]="completedStaticProperty"
+ [completedConfigurations]="completedConfigurations"
+ (completedConfigurationsEmitter)="
+ updateCompletedConfiguration($event)
+ "
>
</sp-app-static-property>
</div>
diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.ts b/ui/src/app/connect/components/configuration-group/configuration-group.component.ts
index 1e109be..e87c47c 100644
--- a/ui/src/app/connect/components/configuration-group/configuration-group.component.ts
+++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.ts
@@ -16,20 +16,21 @@
*
*/
-import { Component, Input } from '@angular/core';
+import { Component, Input, OnInit } from '@angular/core';
import { UntypedFormGroup } from '@angular/forms';
import {
ExtensionDeploymentConfiguration,
StaticPropertyUnion,
} from '@streampipes/platform-services';
import { ConfigurationInfo } from '../../model/ConfigurationInfo';
+import { StaticPropertyUtilService } from '../../../core-ui/static-properties/static-property-util.service';
@Component({
selector: 'sp-configuration-group',
templateUrl: './configuration-group.component.html',
styleUrls: ['./configuration-group.component.scss'],
})
-export class ConfigurationGroupComponent {
+export class ConfigurationGroupComponent implements OnInit {
@Input() configurationGroup: UntypedFormGroup;
@Input() adapterId: string;
@@ -38,11 +39,22 @@
@Input() deploymentConfiguration: ExtensionDeploymentConfiguration;
- completedStaticProperty: ConfigurationInfo;
+ completedConfigurations: ConfigurationInfo[] = [];
- constructor() {}
+ constructor(private staticPropertyUtils: StaticPropertyUtilService) {}
- triggerUpdate(configurationInfo: ConfigurationInfo) {
- this.completedStaticProperty = { ...configurationInfo };
+ ngOnInit() {
+ this.completedConfigurations =
+ this.staticPropertyUtils.initializeCompletedConfigurations(
+ this.configuration,
+ );
+ }
+
+ updateCompletedConfiguration(configurationInfo: ConfigurationInfo) {
+ this.staticPropertyUtils.updateCompletedConfiguration(
+ configurationInfo,
+ this.completedConfigurations,
+ );
+ this.completedConfigurations = [...this.completedConfigurations];
}
}
diff --git a/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts b/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts
index bc61dcd..2fc1164 100644
--- a/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts
+++ b/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts
@@ -21,10 +21,11 @@
StaticProperty,
StaticPropertyUnion,
} from '@streampipes/platform-services';
-import { Directive, EventEmitter, Input, Output } from '@angular/core';
+import { Directive, EventEmitter, inject, Input, Output } from '@angular/core';
import { UntypedFormGroup } from '@angular/forms';
import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo';
import { InvocablePipelineElementUnion } from '../../../editor/model/editor.model';
+import { StaticPropertyUtilService } from '../static-property-util.service';
@Directive()
// eslint-disable-next-line @angular-eslint/directive-class-suffix
@@ -53,14 +54,21 @@
@Input()
displayRecommended: boolean;
- @Output() updateEmitter: EventEmitter<ConfigurationInfo> =
+ @Input()
+ completedConfigurations: ConfigurationInfo[];
+
+ @Output()
+ completedConfigurationsEmitter: EventEmitter<ConfigurationInfo> =
new EventEmitter();
+ staticPropertyUtils = inject(StaticPropertyUtilService);
+
constructor() {}
- emitUpdate(valid?: boolean) {
- this.updateEmitter.emit(
- new ConfigurationInfo(this.staticProperty.internalName, valid),
- );
+ applyCompletedConfiguration(valid?: boolean) {
+ this.completedConfigurationsEmitter.emit({
+ staticPropertyInternalName: this.staticProperty.internalName,
+ configured: valid,
+ });
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
index 7ae838b..a68765e 100644
--- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
+++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
@@ -61,6 +61,9 @@
<div style="padding: 0 10px">
<sp-app-static-property
[adapterId]="adapterId"
+ [completedConfigurations]="
+ completedAlternativeConfigurations
+ "
[deploymentConfiguration]="deploymentConfiguration"
[eventSchemas]="eventSchemas"
[parentForm]="parentForm"
@@ -74,7 +77,9 @@
"
[staticProperty]="alternative.staticProperty"
[displayRecommended]="displayRecommended"
- (updateEmitter)="handleConfigurationUpdate($event)"
+ (completedConfigurationsEmitter)="
+ handleConfigurationUpdate($event)
+ "
class="test fullWidth"
>
</sp-app-static-property>
diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts
index 6e4c668..886b67e 100644
--- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts
@@ -44,13 +44,12 @@
@Input()
deploymentConfiguration: ExtensionDeploymentConfiguration;
- @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
+ // dependentStaticPropertyIds: Map<string, boolean> = new Map<
+ // string,
+ // boolean
+ // >();
- completedStaticProperty: ConfigurationInfo;
- dependentStaticPropertyIds: Map<string, boolean> = new Map<
- string,
- boolean
- >();
+ completedAlternativeConfigurations: ConfigurationInfo[] = [];
constructor(private changeDetectorRef: ChangeDetectorRef) {
super();
@@ -59,10 +58,16 @@
ngOnInit() {
this.staticProperty.alternatives.forEach(al => {
if (al.staticProperty) {
- const configuration = al.staticProperty.internalName;
- this.dependentStaticPropertyIds.set(configuration, false);
+ this.completedAlternativeConfigurations.push({
+ staticPropertyInternalName: al.staticProperty.internalName,
+ configured: false,
+ });
}
});
+ if (!this.staticProperty.alternatives.some(a => a.selected)) {
+ this.staticProperty.alternatives[0].selected = true;
+ this.checkFireCompleted(this.staticProperty.alternatives[0]);
+ }
}
radioSelectionChange(event) {
@@ -75,21 +80,23 @@
}
handleConfigurationUpdate(configurationInfo: ConfigurationInfo) {
- this.dependentStaticPropertyIds.set(
- configurationInfo.staticPropertyInternalName,
- configurationInfo.configured,
+ this.staticPropertyUtils.updateCompletedConfiguration(
+ configurationInfo,
+ this.completedAlternativeConfigurations,
);
if (this.alternativeCompleted()) {
- this.completedStaticProperty = { ...configurationInfo };
- this.emitUpdate(true);
+ this.completedAlternativeConfigurations = [
+ ...this.completedAlternativeConfigurations,
+ ];
+ this.applyCompletedConfiguration(true);
} else {
- this.emitUpdate();
+ this.applyCompletedConfiguration(false);
}
}
checkFireCompleted(alternative: StaticPropertyAlternative) {
if (alternative.selected && alternative.staticProperty === null) {
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
}
@@ -102,9 +109,11 @@
if (al.staticProperty === null) {
return false;
} else {
- return this.dependentStaticPropertyIds.get(
- al.staticProperty.internalName,
- );
+ return this.completedAlternativeConfigurations.find(
+ c =>
+ c.staticPropertyInternalName ===
+ al.staticProperty.internalName,
+ ).configured;
}
}
}) !== undefined
diff --git a/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts b/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts
index c28ebac..53d3de8 100644
--- a/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts
@@ -25,16 +25,7 @@
templateUrl: './static-any-input.component.html',
styleUrls: ['./static-any-input.component.scss'],
})
-export class StaticAnyInputComponent
- extends AbstractStaticPropertyRenderer<AnyStaticProperty>
- implements OnInit
-{
- @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
- ngOnInit() {
- this.inputEmitter.emit(true);
- }
-
+export class StaticAnyInputComponent extends AbstractStaticPropertyRenderer<AnyStaticProperty> {
select(elementId: string) {
this.staticProperty.options
.filter(option => option.elementId === elementId)
diff --git a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html
index 21df719..ab8e104 100644
--- a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html
+++ b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html
@@ -27,7 +27,7 @@
formControlName="{{ fieldName }}"
[style.background]="staticProperty.selectedColor"
required
- (blur)="emitUpdate()"
+ (blur)="checkCompleted()"
[cpPresetColors]="presetColors"
/>
</div>
diff --git a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts
index c2ddd31..8717ea8 100644
--- a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts
@@ -36,10 +36,6 @@
super();
}
- inputValue: String;
- hasInput: Boolean;
- colorPickerForm: UntypedFormGroup;
-
presetColors: any[] = [
'#39B54A',
'#1B1464',
@@ -58,17 +54,14 @@
this.enableValidators();
}
- emitUpdate() {
- this.updateEmitter.emit(
- new ConfigurationInfo(
- this.staticProperty.internalName,
+ checkCompleted() {
+ this.applyCompletedConfiguration(
+ this.staticPropertyUtil.asColorPickerStaticProperty(
+ this.staticProperty,
+ ).selectedColor &&
this.staticPropertyUtil.asColorPickerStaticProperty(
this.staticProperty,
- ).selectedColor &&
- this.staticPropertyUtil.asColorPickerStaticProperty(
- this.staticProperty,
- ).selectedColor !== '',
- ),
+ ).selectedColor !== '',
);
}
diff --git a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts
index 600058a..69ed38b 100644
--- a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts
@@ -38,8 +38,6 @@
extends AbstractValidatedStaticPropertyRenderer<FileStaticProperty>
implements OnInit
{
- @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
public chooseExistingFileControl = new UntypedFormControl();
dialogRef: MatDialogRef<FileRenameDialogComponent>;
@@ -99,7 +97,7 @@
fmi => fmi.filename === filenameToSelect,
);
this.selectOption(this.selectedFile);
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
this.parentForm.controls[this.fieldName].setValue(
this.selectedFile,
);
@@ -114,7 +112,7 @@
if (this.fileMetadata.length > 0) {
this.selectedFile = this.fileMetadata[0];
this.selectOption(this.selectedFile);
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
this.parentForm.controls[this.fieldName].setValue(
this.selectedFile,
);
@@ -172,9 +170,7 @@
this.staticProperty.locationPath = fileMetadata.filename;
const valid: boolean =
fileMetadata.filename !== '' || fileMetadata.filename !== undefined;
- this.updateEmitter.emit(
- new ConfigurationInfo(this.staticProperty.internalName, valid),
- );
+ this.applyCompletedConfiguration(valid);
}
displayFn(fileMetadata: FileMetadata) {
diff --git a/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts b/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts
index 78be373..e71552b 100644
--- a/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts
@@ -97,9 +97,7 @@
this.staticProperty.value !== undefined &&
this.staticProperty.value !== '' &&
this.staticProperty.value !== null;
- this.updateEmitter.emit(
- new ConfigurationInfo(this.staticProperty.internalName, valid),
- );
+ this.applyCompletedConfiguration(valid);
}
onStatusChange(status: any) {}
diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html
index fb77d38..d321f4c 100644
--- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html
+++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html
@@ -28,7 +28,7 @@
[eventSchemas]="eventSchemas"
[staticProperty]="property"
[displayRecommended]="displayRecommended"
- (updateEmitter)="handleConfigurationUpdate($event)"
+ (completedConfigurationsEmitter)="handleConfigurationUpdate($event)"
class="test fullWidth"
>
</sp-app-static-property>
diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts b/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts
index 45da778..73141ea 100644
--- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts
@@ -36,8 +36,6 @@
@Input()
deploymentConfiguration: ExtensionDeploymentConfiguration;
- @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
dependentStaticProperties: Map<string, boolean> = new Map<
string,
boolean
@@ -53,9 +51,9 @@
v => v === true,
)
) {
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
} else {
- this.emitUpdate(false);
+ this.applyCompletedConfiguration(false);
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts
index bd1e95d..c67a1e8 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts
@@ -30,8 +30,6 @@
extends StaticMappingComponent<MappingPropertyNary>
implements OnInit
{
- @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
constructor(private displayRecommendedPipe: DisplayRecommendedPipe) {
super();
}
@@ -56,7 +54,6 @@
}
});
}
- this.inputEmitter.emit(true);
}
selectOption(property: any, $event) {
@@ -112,6 +109,6 @@
onStatusChange(status: any) {}
onValueChange(value: any) {
- this.emitUpdate();
+ this.applyCompletedConfiguration();
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
index d414502..db5cf86 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
@@ -30,8 +30,6 @@
extends StaticMappingComponent<MappingPropertyUnary>
implements OnInit
{
- @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
constructor() {
super();
}
@@ -41,7 +39,7 @@
if (!this.staticProperty.selectedProperty) {
this.staticProperty.selectedProperty =
this.availableProperties[0].propertySelector;
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
this.addValidator(
this.staticProperty.selectedProperty,
@@ -54,6 +52,6 @@
onValueChange(value: any) {
this.staticProperty.selectedProperty = value;
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts b/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts
index 1d1b835..f05537f 100644
--- a/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts
@@ -52,7 +52,7 @@
).elementId;
}
this.inputEmitter.emit(true);
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
this.parentForm.updateValueAndValidity();
}
@@ -71,6 +71,6 @@
option => option.elementId === id,
).selected = true;
this.inputEmitter.emit(true);
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-property-util.service.ts b/ui/src/app/core-ui/static-properties/static-property-util.service.ts
index b85882c..533cff0 100644
--- a/ui/src/app/core-ui/static-properties/static-property-util.service.ts
+++ b/ui/src/app/core-ui/static-properties/static-property-util.service.ts
@@ -37,11 +37,53 @@
StaticPropertyGroup,
} from '@streampipes/platform-services';
import { IdGeneratorService } from '../../core-services/id-generator/id-generator.service';
+import { ConfigurationInfo } from '../../connect/model/ConfigurationInfo';
@Injectable({ providedIn: 'root' })
export class StaticPropertyUtilService {
constructor(private idGeneratorService: IdGeneratorService) {}
+ public initializeCompletedConfigurations(
+ configs: StaticProperty[],
+ ): ConfigurationInfo[] {
+ return configs
+ .filter(config => !config.optional)
+ .map(config => {
+ return {
+ staticPropertyInternalName: config.internalName,
+ configured: false,
+ };
+ });
+ }
+
+ public allDependenciesSatisfied(
+ dependsOn: string[],
+ completedConfigs: ConfigurationInfo[],
+ ) {
+ if (dependsOn?.length > 0) {
+ return dependsOn.every(dependency =>
+ completedConfigs.some(
+ config =>
+ config.staticPropertyInternalName === dependency &&
+ config.configured,
+ ),
+ );
+ } else {
+ return true;
+ }
+ }
+
+ public updateCompletedConfiguration(
+ completedConfig: ConfigurationInfo,
+ completedConfigs: ConfigurationInfo[],
+ ) {
+ completedConfigs.find(
+ c =>
+ c.staticPropertyInternalName ===
+ completedConfig.staticPropertyInternalName,
+ ).configured = completedConfig.configured;
+ }
+
public clone(val: StaticProperty) {
let clone;
const id = this.idGeneratorService.generatePrefixedId();
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index 3f43c9b..fe6c2bf 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -37,78 +37,96 @@
<div fxFlex="100">
<sp-static-code-input
*ngIf="isCodeInputStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[staticProperty]="staticProperty"
[parentForm]="parentForm"
[eventSchemas]="eventSchemas"
[fieldName]="fieldName"
- (updateEmitter)="emitUpdate($event)"
>
</sp-static-code-input>
<sp-app-static-secret-input
*ngIf="isSecretStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[staticProperty]="staticProperty"
[parentForm]="parentForm"
[fieldName]="fieldName"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-secret-input>
<sp-app-static-free-input
*ngIf="isFreeTextStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[staticProperty]="staticProperty"
[parentForm]="parentForm"
[eventSchemas]="eventSchemas"
[fieldName]="fieldName"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-free-input>
<sp-static-file-input
*ngIf="isFileStaticProperty(staticProperty)"
- (inputEmitter)="valueChange($event)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[staticProperty]="staticProperty"
[parentForm]="parentForm"
[fieldName]="fieldName"
- (updateEmitter)="emitUpdate($event)"
[adapterId]="adapterId"
>
</sp-static-file-input>
<sp-app-static-color-picker
*ngIf="isColorPickerStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[staticProperty]="staticProperty"
[parentForm]="parentForm"
[fieldName]="fieldName"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-color-picker>
<sp-app-static-runtime-resolvable-any-input
*ngIf="isRuntimeResolvableAnyStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[deploymentConfiguration]="deploymentConfiguration"
[staticProperty]="staticProperty"
[staticProperties]="staticProperties"
[eventSchemas]="eventSchemas"
[pipelineElement]="pipelineElement"
[parentForm]="parentForm"
- [completedStaticProperty]="completedStaticProperty"
[adapterId]="adapterId"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-runtime-resolvable-any-input>
<sp-app-static-runtime-resolvable-oneof-input
*ngIf="isRuntimeResolvableOneOfStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[deploymentConfiguration]="deploymentConfiguration"
- [completedStaticProperty]="completedStaticProperty"
[pipelineElement]="pipelineElement"
[eventSchemas]="eventSchemas"
[staticProperty]="staticProperty"
[parentForm]="parentForm"
[staticProperties]="staticProperties"
[adapterId]="adapterId"
- (updateEmitter)="emitUpdate($event)"
></sp-app-static-runtime-resolvable-oneof-input>
<sp-app-static-any-input
@@ -116,7 +134,10 @@
isAnyStaticProperty(staticProperty) &&
!isRuntimeResolvableAnyStaticProperty(staticProperty)
"
- (inputEmitter)="valueChange($event)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[fieldName]="fieldName"
[staticProperty]="staticProperty"
>
@@ -127,8 +148,10 @@
isOneOfStaticProperty(staticProperty) &&
!isRuntimeResolvableOneOfStaticProperty(staticProperty)
"
- (inputEmitter)="valueChange($event)"
- (updateEmitter)="emitUpdate($event)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[parentForm]="parentForm"
[staticProperty]="staticProperty"
>
@@ -136,30 +159,38 @@
<sp-app-static-mapping-unary
*ngIf="isMappingPropertyUnary(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[eventSchemas]="eventSchemas"
[staticProperty]="staticProperty"
[displayRecommended]="displayRecommended"
[parentForm]="parentForm"
[fieldName]="fieldName"
- (inputEmitter)="valueChange($event)"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-mapping-unary>
<sp-app-static-mapping-nary
*ngIf="isMappingNaryProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[eventSchemas]="eventSchemas"
[parentForm]="parentForm"
[fieldName]="fieldName"
[staticProperty]="staticProperty"
[displayRecommended]="displayRecommended"
- (inputEmitter)="valueChange($event)"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-mapping-nary>
<sp-app-static-alternatives
*ngIf="isAlternativesStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[adapterId]="adapterId"
[deploymentConfiguration]="deploymentConfiguration"
[eventSchemas]="eventSchemas"
@@ -168,8 +199,6 @@
[staticProperties]="staticProperties"
[displayRecommended]="displayRecommended"
class="test fullWidth"
- (inputEmitter)="valueChange($event)"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-alternatives>
@@ -178,6 +207,10 @@
isGroupStaticProperty(staticProperty) &&
!isRuntimeResolvableGroupStaticProperty(staticProperty)
"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[adapterId]="adapterId"
[deploymentConfiguration]="deploymentConfiguration"
[eventSchemas]="eventSchemas"
@@ -187,12 +220,15 @@
[staticProperties]="staticProperties"
[displayRecommended]="displayRecommended"
class="test fullWidth"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-group>
<sp-app-static-runtime-resolvable-group
*ngIf="isRuntimeResolvableGroupStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[deploymentConfiguration]="deploymentConfiguration"
[adapterId]="adapterId"
[eventSchemas]="eventSchemas"
@@ -200,15 +236,17 @@
[fieldName]="fieldName"
[staticProperties]="staticProperties"
[staticProperty]="staticProperty"
- [completedStaticProperty]="completedStaticProperty"
[displayRecommended]="displayRecommended"
class="test fullWidth"
- (updateEmitter)="emitUpdate($event)"
>
</sp-app-static-runtime-resolvable-group>
<sp-static-collection
*ngIf="isCollectionStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[deploymentConfiguration]="deploymentConfiguration"
[adapterId]="adapterId"
[eventSchemas]="eventSchemas"
@@ -217,27 +255,32 @@
[displayRecommended]="displayRecommended"
[staticProperty]="staticProperty"
class="test fullWidth"
- (updateEmitter)="emitUpdate($event)"
>
</sp-static-collection>
<sp-static-slide-toggle
*ngIf="isSlideToggleStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[adapterId]="adapterId"
[parentForm]="parentForm"
[fieldName]="fieldName"
[displayRecommended]="displayRecommended"
[staticProperty]="staticProperty"
class="test fullWidth"
- (updateEmitter)="emitUpdate($event)"
>
</sp-static-slide-toggle>
<sp-static-runtime-resolvable-tree-input
*ngIf="isTreeInputStaticProperty(staticProperty)"
+ (completedConfigurationsEmitter)="
+ completedConfigurationsEmitter.emit($event)
+ "
+ [completedConfigurations]="completedConfigurations"
[deploymentConfiguration]="deploymentConfiguration"
[adapterId]="adapterId"
[parentForm]="parentForm"
- [completedStaticProperty]="completedStaticProperty"
[pipelineElement]="pipelineElement"
[eventSchemas]="eventSchemas"
[staticProperties]="staticProperties"
@@ -245,7 +288,6 @@
[displayRecommended]="displayRecommended"
[staticProperty]="staticProperty"
class="test fullWidth"
- (updateEmitter)="emitUpdate($event)"
>
</sp-static-runtime-resolvable-tree-input>
</div>
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.ts b/ui/src/app/core-ui/static-properties/static-property.component.ts
index ef3ddc9..5d4248d 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-property.component.ts
@@ -59,19 +59,10 @@
@Input()
adapterId: string;
- @Output()
- validateEmitter: EventEmitter<any> = new EventEmitter<any>();
-
- @Output()
- updateEmitter: EventEmitter<ConfigurationInfo> = new EventEmitter();
-
@Input()
eventSchemas: EventSchema[];
@Input()
- completedStaticProperty: ConfigurationInfo;
-
- @Input()
parentForm: UntypedFormGroup;
@Input()
@@ -86,6 +77,13 @@
@Input()
deploymentConfiguration: ExtensionDeploymentConfiguration;
+ @Input()
+ completedConfigurations: ConfigurationInfo[];
+
+ @Output()
+ completedConfigurationsEmitter: EventEmitter<ConfigurationInfo> =
+ new EventEmitter();
+
showLabel = true;
@Input()
@@ -167,12 +165,4 @@
isTreeInputStaticProperty(val) {
return val instanceof RuntimeResolvableTreeInputStaticProperty;
}
-
- valueChange(hasInput) {
- this.validateEmitter.emit();
- }
-
- emitUpdate(configurationInfo: ConfigurationInfo) {
- this.updateEmitter.emit(configurationInfo);
- }
}
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts
index 6d84538..7b210ff 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts
@@ -51,7 +51,7 @@
selectAll(select: boolean): void {
this.staticProperty.options.forEach(o => (o.selected = select));
this.selectedOptions = select ? this.staticProperty.options : [];
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
onSelectionChange(): void {
@@ -61,7 +61,7 @@
}
checkEmitUpdate(): void {
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
afterOptionsLoaded(staticProperty: RuntimeResolvableAnyStaticProperty) {
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts
index 936c1c0..ec8fef5 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts
@@ -52,7 +52,7 @@
if (this.staticProperty.staticProperties.length === 0) {
this.loadOptionsFromRestApi();
}
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
afterErrorReceived() {}
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
index 19e9a9c..564f2c2 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
@@ -47,34 +47,18 @@
extends AbstractStaticPropertyRenderer<T>
implements OnChanges
{
- @Input()
- completedStaticProperty: ConfigurationInfo;
-
@Input() deploymentConfiguration: ExtensionDeploymentConfiguration;
showOptions = false;
loading = false;
error = false;
errorMessage: SpLogMessage;
- dependentStaticProperties: Map<string, boolean> = new Map<
- string,
- boolean
- >();
constructor(private runtimeResolvableService: RuntimeResolvableService) {
super();
}
- onInit() {
- if (
- this.staticProperty.dependsOn &&
- this.staticProperty.dependsOn.length > 0
- ) {
- this.staticProperty.dependsOn.forEach(dp => {
- this.dependentStaticProperties.set(dp, false);
- });
- }
- }
+ onInit() {}
loadOptionsFromRestApi(node?: TreeInputNode) {
const resolvableOptionsParameterRequest = new RuntimeOptionsRequest();
@@ -137,31 +121,14 @@
}
ngOnChanges(changes: SimpleChanges): void {
- if (changes['completedStaticProperty']) {
+ if (changes['completedConfigurations']) {
if (
- this.completedStaticProperty !== undefined &&
- !(
- this.completedStaticProperty.staticPropertyInternalName ===
- this.staticProperty.internalName
+ this.staticPropertyUtils.allDependenciesSatisfied(
+ this.staticProperty.dependsOn,
+ this.completedConfigurations,
)
) {
- if (
- this.dependentStaticProperties.has(
- this.completedStaticProperty.staticPropertyInternalName,
- )
- ) {
- this.dependentStaticProperties.set(
- this.completedStaticProperty.staticPropertyInternalName,
- this.completedStaticProperty.configured,
- );
- }
- if (
- Array.from(this.dependentStaticProperties.values()).every(
- v => v === true,
- )
- ) {
- this.loadOptionsFromRestApi();
- }
+ this.loadOptionsFromRestApi();
}
}
}
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index 88f4dd4..eb6ffcc 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -54,7 +54,6 @@
this.staticProperty.options.length > 0
) {
this.staticProperty.options[0].selected = true;
- this.emitUpdate(true);
}
}
@@ -66,7 +65,7 @@
option => option.elementId === id,
).selected = true;
this.performValidation();
- this.emitUpdate(true);
+ this.applyCompletedConfiguration(true);
}
parse(
diff --git a/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts b/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts
index 2caa19c..f085755 100644
--- a/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts
@@ -36,25 +36,19 @@
super();
}
- @Output() updateEmitter: EventEmitter<ConfigurationInfo> =
- new EventEmitter();
-
ngOnInit() {
this.addValidator(this.staticProperty.value, Validators.required);
this.enableValidators();
}
emitUpdate() {
- this.updateEmitter.emit(
- new ConfigurationInfo(
- this.staticProperty.internalName,
+ this.applyCompletedConfiguration(
+ this.staticPropertyUtil.asFreeTextStaticProperty(
+ this.staticProperty,
+ ).value &&
this.staticPropertyUtil.asFreeTextStaticProperty(
this.staticProperty,
- ).value &&
- this.staticPropertyUtil.asFreeTextStaticProperty(
- this.staticProperty,
- ).value !== '',
- ),
+ ).value !== '',
);
}
diff --git a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts
index e74274a..0b0dcce 100644
--- a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts
@@ -37,9 +37,7 @@
}
emitUpdate() {
- this.updateEmitter.emit(
- new ConfigurationInfo(this.staticProperty.internalName, true),
- );
+ this.applyCompletedConfiguration(true);
}
onStatusChange(status: any) {}
diff --git a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
index c7c1a4b..8823d51 100644
--- a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
+++ b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
@@ -83,7 +83,11 @@
this.dataExplorerService.getAllPersistedDataStreams(),
this.datalakeRestService.getAllMeasurementSeries(),
).subscribe(response => {
- this.availablePipelines = response[0];
+ this.availablePipelines = response[0].filter(
+ p =>
+ response[1].find(m => m.measureName === p.measureName) !==
+ undefined,
+ );
this.availableMeasurements = response[1];
// replace pipeline event schemas. Reason: Available measures do not contain field for timestamp
diff --git a/ui/src/app/editor/dialog/customize/customize.component.html b/ui/src/app/editor/dialog/customize/customize.component.html
index 151d882..4bd6f29 100644
--- a/ui/src/app/editor/dialog/customize/customize.component.html
+++ b/ui/src/app/editor/dialog/customize/customize.component.html
@@ -93,12 +93,11 @@
[eventSchemas]="eventSchemas"
[parentForm]="parentForm"
[fieldName]="config.internalName"
- [completedStaticProperty]="
- completedStaticProperty
+ [completedConfigurations]="
+ completedConfigurations
"
- (updateEmitter)="triggerUpdate($event)"
- (validateEmitter)="
- validConfiguration($event)
+ (completedConfigurationsEmitter)="
+ updateCompletedConfiguration($event)
"
>
</sp-app-static-property>
diff --git a/ui/src/app/editor/dialog/customize/customize.component.ts b/ui/src/app/editor/dialog/customize/customize.component.ts
index ae2d73e..5b45b93 100644
--- a/ui/src/app/editor/dialog/customize/customize.component.ts
+++ b/ui/src/app/editor/dialog/customize/customize.component.ts
@@ -42,6 +42,7 @@
import { ShepherdService } from '../../../services/tour/shepherd.service';
import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo';
import { PipelineStyleService } from '../../services/pipeline-style.service';
+import { StaticPropertyUtilService } from '../../../core-ui/static-properties/static-property-util.service';
@Component({
selector: 'sp-customize-pipeline-element',
@@ -60,13 +61,7 @@
_showDocumentation = false;
selection: any;
- matchingSelectionLeft: any;
- matchingSelectionRight: any;
invalid: any;
- helpDialogVisible: any;
- validationErrors: any;
-
- sourceEndpoint: any;
sepa: any;
parentForm: UntypedFormGroup;
@@ -82,6 +77,7 @@
templateMode = false;
template: PipelineElementTemplate;
templateConfigs: Map<string, any>[] = [];
+ completedConfigurations: ConfigurationInfo[] = [];
constructor(
private dialogRef: DialogRef<CustomizeComponent>,
@@ -91,6 +87,7 @@
private changeDetectorRef: ChangeDetectorRef,
private pipelineElementTemplateService: PipelineElementTemplateService,
private pipelineStyleService: PipelineStyleService,
+ private staticPropertyUtils: StaticPropertyUtilService,
) {}
ngOnInit(): void {
@@ -98,6 +95,10 @@
this.cachedPipelineElement = this.jsPlumbService.clone(
this.pipelineElement.payload,
) as InvocablePipelineElementUnion;
+ this.completedConfigurations =
+ this.staticPropertyUtils.initializeCompletedConfigurations(
+ this.cachedPipelineElement.staticProperties,
+ );
this.isDataProcessor =
this.cachedPipelineElement instanceof DataProcessorInvocation;
this.cachedPipelineElement.inputStreams.forEach(is => {
@@ -147,8 +148,6 @@
this.dialogRef.close(this.pipelineElement);
}
- validConfiguration(event: any) {}
-
set showDocumentation(value: boolean) {
if (value) {
this.dialogRef.changeDialogSize({ width: '90vw' });
@@ -174,6 +173,14 @@
this.completedStaticProperty = { ...configurationInfo };
}
+ updateCompletedConfiguration(configurationInfo: ConfigurationInfo) {
+ this.staticPropertyUtils.updateCompletedConfiguration(
+ configurationInfo,
+ this.completedConfigurations,
+ );
+ this.completedConfigurations = [...this.completedConfigurations];
+ }
+
triggerTemplateMode() {
this.template = new PipelineElementTemplate();
this.templateMode = true;