feat(#3353): Support sign and encrypt security modes in OPC-UA adapteā€¦ (#3354)

* feat(#3353): Support sign and encrypt security modes in OPC-UA adapter and sink

* Fix test

* Fix dependency convergence

* Modify OPC e2e tests

* Improve completed static property validation

* Minor bug fixes

* Fix test

* Add additional environment variables
diff --git a/pom.xml b/pom.xml
index c1285c0..8f0d8ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
         <amqp-client.version>5.21.0</amqp-client.version>
         <apache-sis-referencing.version>1.2</apache-sis-referencing.version>
         <boofcv.version>1.1.0</boofcv.version>
+        <bcprov-jdk18on.version>1.78.1</bcprov-jdk18on.version>
         <classindex.version>3.9</classindex.version>
         <checker-qual.version>3.43.0</checker-qual.version>
         <commons-codec.version>1.17.0</commons-codec.version>
@@ -54,7 +55,7 @@
         <commons-pool2.version>2.12.0</commons-pool2.version>
         <commons-text.version>1.12.0</commons-text.version>
         <ditto-client.version>1.0.0</ditto-client.version>
-        <eclipse.milo.version>0.6.9</eclipse.milo.version>
+        <eclipse.milo.version>0.6.14</eclipse.milo.version>
         <file-management.version>3.1.0</file-management.version>
         <flink.version>1.13.5</flink.version>
         <fogsy-qudt.version>1.0</fogsy-qudt.version>
@@ -399,6 +400,16 @@
                 </exclusions>
             </dependency>
             <dependency>
+                <groupId>org.bouncycastle</groupId>
+                <artifactId>bcprov-jdk18on</artifactId>
+                <version>${bcprov-jdk18on.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.bouncycastle</groupId>
+                <artifactId>bcutil-jdk18on</artifactId>
+                <version>${bcprov-jdk18on.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.eclipse.rdf4j</groupId>
                 <artifactId>rdf4j-rio-turtle</artifactId>
                 <version>${rdf4j.version}</version>
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7ad56fe..7a2d033 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -108,7 +108,18 @@
 
   // expects a comma separated string of service names
   SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
-  SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", "");
+  SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", ""),
+
+  // OPC-UA security
+  SP_OPCUA_SECURITY_DIR("SP_OPCUA_SECURITY_DIR", "/streampipes-security/opcua"),
+  SP_OPCUA_KEYSTORE_FILE("SP_OPCUA_KEYSTORE_FILE", "keystore.pfx"),
+  SP_OPCUA_KEYSTORE_PASSWORD("SP_OPCUA_KEYSTORE_PASSWORD", "password"),
+  SP_OPCUA_KEYSTORE_TYPE("SP_OPCUA_KEYSTORE_TYPE", "PKCS12"),
+  SP_OPCUA_KEYSTORE_ALIAS("SP_OPCUA_KEYSTORE_ALIAS", "apache-streampipes"),
+  SP_OPCUA_APPLICATION_URI(
+      "SP_OPCUA_APPLICATION_URI",
+      "urn:org:apache:streampipes:opcua:client"
+  );
 
   private final String envVariableName;
   private String defaultValue;
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 36ed402..bb5bbac 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -328,4 +328,33 @@
     return new StringEnvironmentVariable(Envs.SP_ALLOWED_UPLOAD_FILETYPES);
   }
 
+  @Override
+  public StringEnvironmentVariable getOpcUaSecurityDir() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_SECURITY_DIR);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaKeystoreFile() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_FILE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaKeystorePassword() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaApplicationUri() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_APPLICATION_URI);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOPcUaKeystoreType() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaKeystoreAlias() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
+  }
 }
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index d1d6efa..cb441f0 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -36,7 +36,9 @@
   IntEnvironmentVariable getServicePort();
 
   StringEnvironmentVariable getSpCoreScheme();
+
   StringEnvironmentVariable getSpCoreHost();
+
   IntEnvironmentVariable getSpCorePort();
 
   // Time series storage env variables
@@ -144,12 +146,15 @@
 
   // Broker defaults
   StringEnvironmentVariable getKafkaHost();
+
   IntEnvironmentVariable getKafkaPort();
 
   StringEnvironmentVariable getMqttHost();
+
   IntEnvironmentVariable getMqttPort();
 
   StringEnvironmentVariable getNatsHost();
+
   IntEnvironmentVariable getNatsPort();
 
   StringEnvironmentVariable getPulsarUrl();
@@ -158,4 +163,15 @@
 
   StringEnvironmentVariable getAllowedUploadFiletypes();
 
+  StringEnvironmentVariable getOpcUaSecurityDir();
+
+  StringEnvironmentVariable getOpcUaKeystoreFile();
+
+  StringEnvironmentVariable getOpcUaKeystorePassword();
+
+  StringEnvironmentVariable getOpcUaApplicationUri();
+
+  StringEnvironmentVariable getOPcUaKeystoreType();
+
+  StringEnvironmentVariable getOpcUaKeystoreAlias();
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/pom.xml b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
index 71eaf09..673e685 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
@@ -21,9 +21,9 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.streampipes</groupId>
-        <artifactId>streampipes-parent</artifactId>
+        <artifactId>streampipes-extensions</artifactId>
         <version>0.97.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
     <artifactId>streampipes-connectors-opcua</artifactId>
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
index 63aacda..03fc844 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
@@ -23,25 +23,35 @@
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
 import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
 import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
+import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
+import org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
 import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
 
 import java.util.List;
 
 public class OpcUaConnectorsModuleExport implements IExtensionModuleExport {
+
+  private final OpcUaClientProvider clientProvider;
+
+  public OpcUaConnectorsModuleExport() {
+    this.clientProvider = new OpcUaClientProvider();
+  }
+
   @Override
   public List<StreamPipesAdapter> adapters() {
     return List.of(
-        new OpcUaAdapter()
+        new OpcUaAdapter(clientProvider)
     );
   }
 
   @Override
   public List<IStreamPipesPipelineElement<?>> pipelineElements() {
     return List.of(
-        new OpcUaSink()
+        new OpcUaSink(clientProvider)
     );
   }
 
@@ -50,7 +60,9 @@
     return List.of(
         new OpcUaAdapterMigrationV1(),
         new OpcUaAdapterMigrationV2(),
-        new OpcUaAdapterMigrationV3()
+        new OpcUaAdapterMigrationV3(),
+        new OpcUaAdapterMigrationV4(),
+        new OpcUaSinkMigrationV1()
     );
   }
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index baf5cfe..526ff7b 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -29,7 +29,8 @@
 import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
 import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
 import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
 import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
@@ -42,7 +43,6 @@
 import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
-import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -66,7 +66,6 @@
 import java.util.stream.Collectors;
 
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
@@ -78,7 +77,9 @@
   private static final Logger LOG = LoggerFactory.getLogger(OpcUaAdapter.class);
 
   private int pullingIntervalMilliSeconds;
-  private SpOpcUaClient<OpcUaAdapterConfig> spOpcUaClient;
+  private final OpcUaClientProvider clientProvider;
+  private ConnectedOpcUaClient connectedClient;
+  private OpcUaAdapterConfig opcUaAdapterConfig;
   private List<OpcNode> allNodes;
   private List<NodeId> allNodeIds;
   private int numberProperties;
@@ -92,15 +93,14 @@
    */
   private final Map<String, String> nodeIdToLabelMapping;
 
-  public OpcUaAdapter() {
-    super();
+  public OpcUaAdapter(OpcUaClientProvider clientProvider) {
+    this.clientProvider = clientProvider;
     this.numberProperties = 0;
     this.event = new HashMap<>();
     this.nodeIdToLabelMapping = new HashMap<>();
   }
 
   private void prepareAdapter(IAdapterParameterExtractor extractor) throws AdapterException {
-
     this.allNodeIds = new ArrayList<>();
     List<String> deleteKeys = extractor
         .getAdapterDescription()
@@ -111,9 +111,9 @@
         .collect(Collectors.toList());
 
     try {
-      this.spOpcUaClient.connect();
+      this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
       OpcUaNodeBrowser browserClient =
-          new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
+          new OpcUaNodeBrowser(this.connectedClient.getClient(), this.opcUaAdapterConfig);
       this.allNodes = browserClient.findNodes(deleteKeys);
 
 
@@ -121,11 +121,11 @@
         this.allNodeIds.add(node.getNodeId());
       }
 
-      if (spOpcUaClient.getSpOpcConfig().inPullMode()) {
-        this.pullingIntervalMilliSeconds = spOpcUaClient.getSpOpcConfig().getPullIntervalMilliSeconds();
+      if (opcUaAdapterConfig.inPullMode()) {
+        this.pullingIntervalMilliSeconds = opcUaAdapterConfig.getPullIntervalMilliSeconds();
       } else {
         this.numberProperties = this.allNodeIds.size();
-        this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
+        this.connectedClient.createListSubscription(this.allNodeIds, this);
       }
 
       this.allNodes.forEach(node -> this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
@@ -139,7 +139,7 @@
   @Override
   public void pullData() throws ExecutionException, RuntimeException, InterruptedException, TimeoutException {
     var response =
-        this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
+        this.connectedClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
     boolean badStatusCodeReceived = false;
     boolean emptyValueReceived = false;
     List<DataValue> returnValues =
@@ -168,7 +168,7 @@
 
   private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
     return badStatusCodeReceived
-        && this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
+        && this.opcUaAdapterConfig.getIncompleteEventStrategy()
         .equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
   }
 
@@ -208,13 +208,13 @@
   public void onAdapterStarted(IAdapterParameterExtractor extractor,
                                IEventCollector collector,
                                IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
-    this.spOpcUaClient = new SpOpcUaClient<>(
-        SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor())
-    );
+    this.opcUaAdapterConfig =
+        SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
+    //this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
     this.collector = collector;
     this.prepareAdapter(extractor);
 
-    if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+    if (this.opcUaAdapterConfig.inPullMode()) {
       this.pullAdapterScheduler = new PullAdapterScheduler();
       this.pullAdapterScheduler.schedule(this, extractor.getAdapterDescription().getElementId());
     }
@@ -223,9 +223,9 @@
   @Override
   public void onAdapterStopped(IAdapterParameterExtractor extractor,
                                IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
-    this.spOpcUaClient.disconnect();
+    clientProvider.releaseClient(this.opcUaAdapterConfig);
 
-    if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+    if (this.opcUaAdapterConfig.inPullMode()) {
       this.pullAdapterScheduler.shutdown();
     }
   }
@@ -233,12 +233,12 @@
   @Override
   public StaticProperty resolveConfiguration(String staticPropertyInternalName,
                                              IStaticPropertyExtractor extractor) throws SpConfigurationException {
-    return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+    return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, extractor);
   }
 
   @Override
   public IAdapterConfiguration declareConfig() {
-    var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
+    var builder = AdapterConfigurationBuilder.create(ID, 4, () -> new OpcUaAdapter(clientProvider))
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
@@ -255,6 +255,6 @@
   @Override
   public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
                                        IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException {
-    return getSchema(extractor);
+    return getSchema(clientProvider, extractor);
   }
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
index e7dde9d..2a150fa 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
@@ -25,6 +25,7 @@
 
 import org.eclipse.milo.opcua.sdk.client.AddressSpace;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
@@ -45,13 +46,13 @@
 
 public class OpcUaNodeBrowser {
 
-  private final OpcUaClient client;
+  private final UaClient client;
   private final OpcUaConfig spOpcConfig;
 
   private static final Logger LOG = LoggerFactory.getLogger(OpcUaNodeBrowser.class);
 
   public OpcUaNodeBrowser(
-      OpcUaClient client,
+      UaClient client,
       OpcUaConfig spOpcUaClientConfig
   ) {
     this.client = client;
@@ -127,7 +128,7 @@
   }
 
   private List<TreeInputNode> findChildren(
-      OpcUaClient client,
+      UaClient client,
       NodeId nodeId
   ) throws UaException {
     return client
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
index 04dda6e..bcaf11a 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.adapter;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
 import org.eclipse.milo.opcua.stack.core.StatusCodes;
@@ -32,12 +32,12 @@
 import java.util.concurrent.ExecutionException;
 
 public class OpcUaNodeMetadataExtractor {
-  private final OpcUaClient client;
+  private final UaClient client;
   private final UaNode node;
 
   private final Map<String, Object> metadata;
 
-  public OpcUaNodeMetadataExtractor(OpcUaClient client, UaNode node) {
+  public OpcUaNodeMetadataExtractor(UaClient client, UaNode node) {
     this.client = client;
     this.node = node;
     this.metadata = new HashMap<>();
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
new file mode 100644
index 0000000..d71d856
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.client;
+
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ConnectedOpcUaClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConnectedOpcUaClient.class);
+  private final UaClient client;
+  private static final AtomicLong clientHandles = new AtomicLong(1L);
+
+  public ConnectedOpcUaClient(UaClient client) {
+    this.client = client;
+  }
+
+  /***
+   * Register subscriptions for given OPC UA nodes
+   * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
+   * @param opcUaAdapter current instance of {@link OpcUaAdapter}
+   * @throws Exception
+   */
+  public void createListSubscription(List<NodeId> nodes,
+                                     OpcUaAdapter opcUaAdapter) throws Exception {
+    client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
+      @Override
+      public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
+        LOG.warn("Transfer for subscriptionId={} failed: {}", subscription.getSubscriptionId(), statusCode);
+        try {
+          initSubscription(nodes, opcUaAdapter);
+        } catch (Exception e) {
+          LOG.error("Re-creating the subscription failed", e);
+        }
+      }
+    });
+
+    initSubscription(nodes, opcUaAdapter);
+  }
+
+
+  public void initSubscription(List<NodeId> nodes,
+                               OpcUaAdapter opcUaAdapter) throws Exception {
+    /*
+     * create a subscription @ 1000ms
+     */
+    UaSubscription subscription = this.client.getSubscriptionManager().createSubscription(1000.0).get();
+
+    List<CompletableFuture<DataValue>> values = new ArrayList<>();
+
+    for (NodeId node : nodes) {
+      values.add(this.client.readValue(0, TimestampsToReturn.Both, node));
+    }
+
+    for (CompletableFuture<DataValue> value : values) {
+      if (value.get().getValue().toString().contains("null")) {
+        LOG.error("Node has no value");
+      }
+    }
+
+
+    List<ReadValueId> readValues = new ArrayList<>();
+    // Read a specific value attribute
+    for (NodeId node : nodes) {
+      readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE));
+    }
+
+    List<MonitoredItemCreateRequest> requests = new ArrayList<>();
+
+    for (ReadValueId readValue : readValues) {
+      // important: client handle must be unique per item
+      UInteger clientHandle = uint(clientHandles.getAndIncrement());
+
+      MonitoringParameters parameters = new MonitoringParameters(
+          clientHandle,
+          1000.0,     // sampling interval
+          null,      // filter, null means use default
+          uint(10),   // queue size
+          true         // discard oldest
+      );
+
+      requests.add(new MonitoredItemCreateRequest(readValue, MonitoringMode.Reporting, parameters));
+    }
+
+    UaSubscription.ItemCreationCallback onItemCreated =
+        (item, i) -> item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
+    List<UaMonitoredItem> items = subscription.createMonitoredItems(
+        TimestampsToReturn.Both,
+        requests,
+        onItemCreated
+    ).get();
+
+    for (UaMonitoredItem item : items) {
+      NodeId tagId = item.getReadValueId().getNodeId();
+      if (item.getStatusCode().isGood()) {
+        LOG.info("item created for nodeId=" + tagId);
+      } else {
+        LOG.error("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
+      }
+    }
+  }
+
+  /***
+   *
+   * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
+   */
+  public UaClient getClient() {
+    return this.client;
+  }
+
+  public void disconnect() {
+    client.disconnect();
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
new file mode 100644
index 0000000..b3f907b
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.client;
+
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+public class OpcUaClientProvider {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OpcUaClientProvider.class);
+
+  private final Map<String, ConnectedOpcUaClient> clients = new ConcurrentHashMap<>();
+  private final Map<String, Integer> consumers = new ConcurrentHashMap<>();
+
+  public synchronized <T extends OpcUaConfig> ConnectedOpcUaClient getClient(T config)
+      throws UaException, SpConfigurationException, URISyntaxException, ExecutionException, InterruptedException {
+    var serverId = config.getUniqueServerId();
+    if (clients.containsKey(serverId)) {
+      LOG.debug("Adding new consumer to client {}", serverId);
+      consumers.put(serverId, consumers.get(config.getUniqueServerId()) + 1);
+      return clients.get(serverId);
+    } else {
+      LOG.debug("Creating new client {}", serverId);
+      var connectedClient = new SpOpcUaClient<>(config).connect();
+      clients.put(serverId, connectedClient);
+      consumers.put(serverId, 1);
+      return connectedClient;
+    }
+  }
+
+  public <T extends OpcUaConfig> void releaseClient(T config) {
+    String serverId = config.getUniqueServerId();
+    LOG.debug("Releasing client {}", serverId);
+
+    synchronized (this) {
+      consumers.computeIfPresent(serverId, (key, count) -> {
+        int updatedCount = count - 1;
+        if (updatedCount <= 0) {
+          LOG.debug("Disconnecting client {}", serverId);
+          if (clients.containsKey(serverId)) {
+            clients.get(serverId).disconnect();
+            clients.remove(serverId);
+          }
+          return null;
+        }
+        return updatedCount;
+      });
+    }
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
index 95fda81..a17d373 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
@@ -20,38 +20,17 @@
 
 
 import org.apache.streampipes.commons.exceptions.SpConfigurationException;
-import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
 import org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
-import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
-import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
-import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
 
 /***
  * Wrapper class for all OPC UA specific stuff.
@@ -63,120 +42,26 @@
   private OpcUaClient client;
   private final T spOpcConfig;
 
-  private static final AtomicLong clientHandles = new AtomicLong(1L);
-
   public SpOpcUaClient(T config) {
     this.spOpcConfig = config;
   }
 
   /***
-   *
-   * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
-   */
-  public OpcUaClient getClient() {
-    return this.client;
-  }
-
-  /***
    * Establishes appropriate connection to OPC UA endpoint depending on the {@link SpOpcUaClient} instance
    *
    * @throws UaException An exception occurring during OPC connection
    */
-  public void connect()
+  public ConnectedOpcUaClient connect()
       throws UaException, ExecutionException, InterruptedException, SpConfigurationException, URISyntaxException {
     OpcUaClientConfig clientConfig = new MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
-    this.client = OpcUaClient.create(clientConfig);
+    var client = OpcUaClient.create(clientConfig);
     client.connect().get();
-  }
-
-  public void disconnect() {
-    client.disconnect();
-  }
-
-  /***
-   * Register subscriptions for given OPC UA nodes
-   * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
-   * @param opcUaAdapter current instance of {@link OpcUaAdapter}
-   * @throws Exception
-   */
-  public void createListSubscription(List<NodeId> nodes,
-                                     OpcUaAdapter opcUaAdapter) throws Exception {
-    client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
-      @Override
-      public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
-        LOG.warn("Transfer for subscriptionId={} failed: {}", subscription.getSubscriptionId(), statusCode);
-        try {
-          initSubscription(nodes, opcUaAdapter);
-        } catch (Exception e) {
-          LOG.error("Re-creating the subscription failed", e);
-        }
-      }
-    });
-
-    initSubscription(nodes, opcUaAdapter);
+    return new ConnectedOpcUaClient(client);
   }
 
 
-  public void initSubscription(List<NodeId> nodes,
-                               OpcUaAdapter opcUaAdapter) throws Exception {
-    /*
-     * create a subscription @ 1000ms
-     */
-    UaSubscription subscription = this.client.getSubscriptionManager().createSubscription(1000.0).get();
-
-    List<CompletableFuture<DataValue>> values = new ArrayList<>();
-
-    for (NodeId node : nodes) {
-      values.add(this.client.readValue(0, TimestampsToReturn.Both, node));
-    }
-
-    for (CompletableFuture<DataValue> value : values) {
-      if (value.get().getValue().toString().contains("null")) {
-        LOG.error("Node has no value");
-      }
-    }
 
 
-    List<ReadValueId> readValues = new ArrayList<>();
-    // Read a specific value attribute
-    for (NodeId node : nodes) {
-      readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE));
-    }
-
-    List<MonitoredItemCreateRequest> requests = new ArrayList<>();
-
-    for (ReadValueId readValue : readValues) {
-      // important: client handle must be unique per item
-      UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
-      MonitoringParameters parameters = new MonitoringParameters(
-          clientHandle,
-          1000.0,     // sampling interval
-          null,      // filter, null means use default
-          uint(10),   // queue size
-          true         // discard oldest
-      );
-
-      requests.add(new MonitoredItemCreateRequest(readValue, MonitoringMode.Reporting, parameters));
-    }
-
-    UaSubscription.ItemCreationCallback onItemCreated =
-        (item, i) -> item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
-    List<UaMonitoredItem> items = subscription.createMonitoredItems(
-        TimestampsToReturn.Both,
-        requests,
-        onItemCreated
-    ).get();
-
-    for (UaMonitoredItem item : items) {
-      NodeId tagId = item.getReadValueId().getNodeId();
-      if (item.getStatusCode().isGood()) {
-        LOG.info("item created for nodeId=" + tagId);
-      } else {
-        LOG.error("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
-      }
-    }
-  }
 
   public T getSpOpcConfig() {
     return spOpcConfig;
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
index 9813d52..8db41a8 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
@@ -18,19 +18,15 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 
 import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
-import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
 import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
-import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
 
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -39,65 +35,16 @@
   public OpcUaClientConfig makeClientConfig(OpcUaConfig spOpcConfig)
       throws ExecutionException, InterruptedException, SpConfigurationException, URISyntaxException {
     String opcServerUrl = spOpcConfig.getOpcServerURL();
+    String applicationUri = Environments.getEnvironment().getOpcUaApplicationUri().getValueOrDefault();
     List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(opcServerUrl).get();
-    String host = opcServerUrl.split("://")[1].split(":")[0];
 
-    EndpointDescription tmpEndpoint = endpoints
-        .stream()
-        .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
-        .findFirst()
-        .orElseThrow(() -> new SpConfigurationException("No endpoint with security policy none"));
+    var builder = OpcUaClientConfig.builder()
+        .setApplicationName(LocalizedText.english("Apache StreamPipes"))
+        .setApplicationUri(applicationUri);
 
-    tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
-    endpoints = Collections.singletonList(tmpEndpoint);
+    spOpcConfig.getSecurityConfig().configureSecurityPolicy(opcServerUrl, endpoints, builder);
+    spOpcConfig.getIdentityConfig().configureIdentity(builder);
 
-    EndpointDescription endpoint = endpoints
-        .stream()
-        .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
-        .findFirst().orElseThrow(() -> new SpConfigurationException("no desired endpoints returned"));
-
-    return buildConfig(endpoint, spOpcConfig);
-  }
-
-  private OpcUaClientConfig buildConfig(EndpointDescription endpoint,
-                                        OpcUaConfig spOpcConfig) {
-
-    OpcUaClientConfigBuilder builder = defaultBuilder(endpoint);
-    if (!spOpcConfig.isUnauthenticated()) {
-      builder.setIdentityProvider(new UsernameProvider(spOpcConfig.getUsername(), spOpcConfig.getPassword()));
-    }
     return builder.build();
   }
-
-  private OpcUaClientConfigBuilder defaultBuilder(EndpointDescription endpoint) {
-    return OpcUaClientConfig.builder()
-        .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
-        .setApplicationUri("urn:eclipse:milo:examples:client")
-        .setEndpoint(endpoint);
-  }
-
-  private EndpointDescription updateEndpointUrl(
-      EndpointDescription original, String hostname) throws URISyntaxException {
-
-    URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
-
-    String endpointUrl = String.format(
-        "%s://%s:%s%s",
-        uri.getScheme(),
-        hostname,
-        uri.getPort(),
-        uri.getPath()
-    );
-
-    return new EndpointDescription(
-        endpointUrl,
-        original.getServer(),
-        original.getServerCertificate(),
-        original.getSecurityMode(),
-        original.getSecurityPolicyUri(),
-        original.getUserIdentityTokens(),
-        original.getTransportProfileUri(),
-        original.getSecurityLevel()
-    );
-  }
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
index 6db027d..128f1bc 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
@@ -18,15 +18,21 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import org.apache.streampipes.extensions.connectors.opcua.config.identity.IdentityConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
+
 import java.util.List;
 
 public class OpcUaConfig {
 
   private String opcServerURL;
-  private boolean unauthenticated;
-  private String username;
-  private String password;
   private List<String> selectedNodeNames;
+  private IdentityConfig identityConfig;
+  private SecurityConfig securityPolicyConfig;
+
+  public OpcUaConfig() {
+
+  }
 
   public String getOpcServerURL() {
     return opcServerURL;
@@ -36,30 +42,6 @@
     this.opcServerURL = opcServerURL;
   }
 
-  public boolean isUnauthenticated() {
-    return unauthenticated;
-  }
-
-  public void setUnauthenticated(boolean unauthenticated) {
-    this.unauthenticated = unauthenticated;
-  }
-
-  public String getUsername() {
-    return username;
-  }
-
-  public void setUsername(String username) {
-    this.username = username;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
-  public void setPassword(String password) {
-    this.password = password;
-  }
-
   public List<String> getSelectedNodeNames() {
     return selectedNodeNames;
   }
@@ -67,4 +49,24 @@
   public void setSelectedNodeNames(List<String> selectedNodeNames) {
     this.selectedNodeNames = selectedNodeNames;
   }
+
+  public IdentityConfig getIdentityConfig() {
+    return identityConfig;
+  }
+
+  public void setIdentityConfig(IdentityConfig identityConfig) {
+    this.identityConfig = identityConfig;
+  }
+
+  public SecurityConfig getSecurityConfig() {
+    return securityPolicyConfig;
+  }
+
+  public void setSecurityConfig(SecurityConfig securityPolicyConfig) {
+    this.securityPolicyConfig = securityPolicyConfig;
+  }
+
+  public String getUniqueServerId() {
+    return String.format("%s-%s-%s", opcServerURL, securityPolicyConfig, identityConfig);
+  }
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
index 9b5f541..1059ce7 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
 import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
 import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
@@ -29,7 +30,6 @@
 import java.util.List;
 
 import static org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter.PULL_GROUP;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.HOST_PORT;
@@ -41,7 +41,6 @@
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_URL;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
 
@@ -51,14 +50,27 @@
   public static final String INCOMPLETE_OPTION_IGNORE = "ignore-event";
   public static final String INCOMPLETE_OPTION_SEND = "send-event";
 
+  public static final String SECURITY_MODE = "securityMode";
+  public static final String SECURITY_POLICY = "securityPolicy";
+  public static final String USER_AUTHENTICATION = "userAuthentication";
+  public static final String USER_AUTHENTICATION_ANONYMOUS = "anonymous";
+
   public static void appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder<?, ?> builder,
                                              boolean adapterConfig) {
 
     var dependsOn = getDependsOn(adapterConfig);
 
     builder
-        .requiredAlternatives(Labels.withId(ACCESS_MODE),
-            Alternatives.from(Labels.withId(UNAUTHENTICATED)),
+        .requiredSingleValueSelection(
+            Labels.withId(SECURITY_MODE),
+            SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new Option(mode.k, mode.v)).toList()
+        )
+        .requiredSingleValueSelection(
+            Labels.withId(SECURITY_POLICY),
+            SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new Option(p.name())).toList()
+        )
+        .requiredAlternatives(Labels.withId(USER_AUTHENTICATION),
+            Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
             Alternatives.from(Labels.withId(USERNAME_GROUP),
                 StaticProperties.group(
                     Labels.withId(USERNAME_GROUP),
@@ -104,7 +116,7 @@
 
   public static OneOfStaticProperty getIncompleteEventConfig() {
     return StaticProperties.singleValueSelection(
-      Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
+        Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
         List.of(
             new Option("Ignore (only complete messages are sent)", INCOMPLETE_OPTION_IGNORE),
             new Option("Send (incomplete messages are sent)", INCOMPLETE_OPTION_SEND)
@@ -115,10 +127,12 @@
   public static List<String> getDependsOn(boolean adapterConfig) {
     return adapterConfig ? List.of(
         ADAPTER_TYPE.name(),
-        ACCESS_MODE.name(),
+        SECURITY_MODE,
+        SECURITY_POLICY,
         OPC_HOST_OR_URL.name()
     ) : List.of(
-        ACCESS_MODE.name(),
+        SECURITY_MODE,
+        SECURITY_POLICY,
         OPC_HOST_OR_URL.name());
   }
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 6741a3e..333dbad 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -20,11 +20,16 @@
 
 import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
 import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.connectors.opcua.config.identity.AnonymousIdentityConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.identity.UsernamePasswordIdentityConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
 import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
 
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
 import java.util.List;
 
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_HOST_OR_URL;
@@ -35,7 +40,6 @@
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
-import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
 import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
 
 public class SpOpcUaConfigExtractor {
@@ -71,20 +75,31 @@
   }
 
   public static <T extends OpcUaConfig> T extractSharedConfig(IParameterExtractor extractor,
-                                                               T config) {
+                                                              T config) {
 
     String selectedAlternativeConnection =
         extractor.selectedAlternativeInternalId(OPC_HOST_OR_URL.name());
+
     String selectedAlternativeAuthentication =
-        extractor.selectedAlternativeInternalId(ACCESS_MODE.name());
+        extractor.selectedAlternativeInternalId(SharedUserConfiguration.USER_AUTHENTICATION);
+
     List<String> selectedNodeNames =
         extractor.selectedTreeNodesInternalNames(AVAILABLE_NODES.name(), String.class);
-
     config.setSelectedNodeNames(selectedNodeNames);
 
-    boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
-    boolean unauthenticated = selectedAlternativeAuthentication.equals(UNAUTHENTICATED.name());
+    String selectedSecurityMode = extractor.selectedSingleValueInternalName(
+        SharedUserConfiguration.SECURITY_MODE,
+        String.class
+    );
+    String selectedSecurityPolicy = extractor.selectedSingleValue(
+        SharedUserConfiguration.SECURITY_POLICY,
+        String.class
+    );
+    config.setSecurityConfig(new SecurityConfig(
+        MessageSecurityMode.valueOf(selectedSecurityMode),
+        SecurityPolicy.valueOf(selectedSecurityPolicy)));
 
+    boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
     if (useURL) {
       String serverAddress =
           extractor.singleValueParameter(OPC_SERVER_URL.name(), String.class);
@@ -97,15 +112,15 @@
       config.setOpcServerURL(serverAddress + ":" + port);
     }
 
+    boolean unauthenticated = selectedAlternativeAuthentication.equals(
+        SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS
+    );
     if (unauthenticated) {
-      config.setUnauthenticated(true);
+      config.setIdentityConfig(new AnonymousIdentityConfig());
     } else {
       String username = extractor.singleValueParameter(USERNAME.name(), String.class);
       String password = extractor.secretValue(PASSWORD.name());
-
-      config.setUsername(username);
-      config.setPassword(password);
-      config.setUnauthenticated(false);
+      config.setIdentityConfig(new UsernamePasswordIdentityConfig(username, password));
     }
 
     return config;
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
new file mode 100644
index 0000000..ecaad66
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+
+public class AnonymousIdentityConfig implements IdentityConfig {
+
+  @Override
+  public void configureIdentity(OpcUaClientConfigBuilder builder) {
+    builder.setIdentityProvider(new AnonymousProvider());
+  }
+
+  @Override
+  public String toString() {
+    return "anonymous";
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
new file mode 100644
index 0000000..db51fd3
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+
+public interface IdentityConfig {
+
+  void configureIdentity(OpcUaClientConfigBuilder builder);
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
new file mode 100644
index 0000000..825fe1f
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+
+public class UsernamePasswordIdentityConfig implements IdentityConfig {
+
+  private final String username;
+  private final String password;
+
+  public UsernamePasswordIdentityConfig(String username, String password) {
+    this.username = username;
+    this.password = password;
+  }
+
+  @Override
+  public void configureIdentity(OpcUaClientConfigBuilder builder) {
+    builder.setIdentityProvider(new UsernameProvider(username, password));
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s-%S", username, DigestUtils.sha256Hex(password));
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
new file mode 100644
index 0000000..4b132d1
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+
+public class KeyStoreLoader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KeyStoreLoader.class);
+
+  private X509Certificate[] clientCertificateChain;
+  private X509Certificate clientCertificate;
+  private KeyPair clientKeyPair;
+
+  public KeyStoreLoader load(Environment env,
+                             Path securityDir) throws Exception {
+    var keystore = KeyStore.getInstance(env.getOPcUaKeystoreType().getValueOrDefault());
+    var keystoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
+    var keystorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
+    var keystoreAlias = env.getOpcUaKeystoreAlias().getValueOrDefault();
+    Path serverKeystore = securityDir.resolve(keystoreFile);
+    char[] serverKeyStorePassword = keystorePassword.toCharArray();
+
+    LOG.info("Loading KeyStore at {}", serverKeystore);
+
+    try (InputStream in = Files.newInputStream(serverKeystore)) {
+      keystore.load(in, serverKeyStorePassword);
+    }
+
+    Key clientPrivateKey = keystore.getKey(keystoreAlias, serverKeyStorePassword);
+    if (clientPrivateKey instanceof PrivateKey) {
+      clientCertificate = (X509Certificate) keystore.getCertificate(keystoreAlias);
+
+      clientCertificateChain = Arrays.stream(keystore.getCertificateChain(keystoreAlias))
+          .map(X509Certificate.class::cast)
+          .toArray(X509Certificate[]::new);
+
+      PublicKey serverPublicKey = clientCertificate.getPublicKey();
+      clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey);
+    }
+
+    return this;
+  }
+
+  public X509Certificate getClientCertificate() {
+    return clientCertificate;
+  }
+
+  public X509Certificate[] getClientCertificateChain() {
+    return clientCertificateChain;
+  }
+
+  public KeyPair getClientKeyPair() {
+    return clientKeyPair;
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
new file mode 100644
index 0000000..b64660e
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SecurityConfig {
+
+  private final MessageSecurityMode securityMode;
+  private final SecurityPolicy securityPolicy;
+
+  public SecurityConfig(MessageSecurityMode securityMode,
+                        SecurityPolicy securityPolicy) {
+    this.securityMode = securityMode;
+    this.securityPolicy = securityPolicy;
+  }
+
+  public void configureSecurityPolicy(String opcServerUrl,
+                                      List<EndpointDescription> endpoints,
+                                      OpcUaClientConfigBuilder builder)
+      throws SpConfigurationException, URISyntaxException {
+    String host = opcServerUrl.split("://")[1].split(":")[0];
+
+    EndpointDescription tmpEndpoint = endpoints
+        .stream()
+        .filter(e -> e.getSecurityMode() == securityMode)
+        .filter(e -> e.getSecurityPolicyUri().equals(securityPolicy.getUri()))
+        .findFirst()
+        .orElseThrow(() ->
+            new SpConfigurationException("No endpoint available with security mode {} and security policy {}")
+        );
+
+    tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
+
+    if (securityMode != MessageSecurityMode.None) {
+      try {
+        var env = Environments.getEnvironment();
+        var securityDir = Paths.get(env.getOpcUaSecurityDir().getValueOrDefault());
+        var trustListManager = new DefaultTrustListManager(securityDir.resolve("pki").toFile());
+
+        var certificateValidator = new DefaultClientCertificateValidator(trustListManager);
+        var loader = new KeyStoreLoader().load(env, securityDir);
+        builder.setKeyPair(loader.getClientKeyPair());
+        builder.setCertificate(loader.getClientCertificate());
+        builder.setCertificateChain(loader.getClientCertificateChain());
+        builder.setCertificateValidator(certificateValidator);
+      } catch (Exception e) {
+        throw new SpConfigurationException(
+            "Failed to load keystore - check that all required environment variables "
+                + "are defined and the keystore exists",
+            e
+        );
+      }
+    }
+
+    builder.setEndpoint(tmpEndpoint);
+  }
+
+  private EndpointDescription updateEndpointUrl(EndpointDescription original,
+                                                String hostname) throws URISyntaxException {
+
+    URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
+
+    String endpointUrl = String.format("%s://%s:%s%s", uri.getScheme(), hostname, uri.getPort(), uri.getPath());
+
+    return new EndpointDescription(
+        endpointUrl,
+        original.getServer(),
+        original.getServerCertificate(),
+        original.getSecurityMode(),
+        original.getSecurityPolicyUri(),
+        original.getUserIdentityTokens(),
+        original.getTransportProfileUri(),
+        original.getSecurityLevel());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s-%s", securityMode, securityPolicy);
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
new file mode 100644
index 0000000..d4311bf
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import java.util.List;
+
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_MODE;
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_POLICY;
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION;
+import static org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS;
+import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
+import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
+import static org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
+
+public class OpcUaAdapterMigrationV4 implements IAdapterMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        OpcUaAdapter.ID,
+        SpServiceTagPrefix.ADAPTER,
+        3,
+        4
+    );
+  }
+
+  @Override
+  public MigrationResult<AdapterDescription> migrate(AdapterDescription element,
+                                                     IStaticPropertyExtractor extractor) throws RuntimeException {
+    var config = element.getConfig();
+    element.setConfig(migrate(config, extractor));
+
+    return MigrationResult.success(element);
+  }
+
+  public List<StaticProperty> migrate(List<StaticProperty> staticProperties,
+                                      IParameterExtractor extractor) {
+    var securityMode =
+        StaticProperties.singleValueSelection(
+            Labels.withId(SECURITY_MODE),
+            SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new Option(mode.k, mode.v)).toList()
+        );
+    securityMode.getOptions().get(0).setSelected(true);
+
+    var securityPolicy = StaticProperties.singleValueSelection(
+        Labels.withId(SECURITY_POLICY),
+        SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new Option(p.name())).toList()
+    );
+    securityPolicy.getOptions().get(0).setSelected(true);
+
+    boolean anonymous = true;
+    var currentAuthSettings = extractor.selectedAlternativeInternalId(
+        "ACCESS_MODE"
+    );
+    if (currentAuthSettings.equals("USERNAME_GROUP")) {
+      anonymous = false;
+    }
+    var authentication = StaticProperties.alternatives(Labels.withId(USER_AUTHENTICATION),
+        Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
+        Alternatives.from(Labels.withId(USERNAME_GROUP),
+            StaticProperties.group(
+                Labels.withId(USERNAME_GROUP),
+                StaticProperties.stringFreeTextProperty(
+                    Labels.withId(USERNAME)),
+                StaticProperties.secretValue(Labels.withId(PASSWORD))
+            ))
+    );
+    if (anonymous) {
+      authentication.getAlternatives().get(0).setSelected(true);
+    } else {
+      authentication.getAlternatives().get(1).setSelected(true);
+      var username = extractor.singleValueParameter("USERNAME", String.class);
+      var password = extractor.secretValue("PASSWORD");
+      var group = (StaticPropertyGroup) authentication.getAlternatives().get(1).getStaticProperty();
+      ((FreeTextStaticProperty) group.getStaticProperties().get(0)).setValue(username);
+      ((SecretStaticProperty) group.getStaticProperties().get(1)).setValue(password);
+      ((SecretStaticProperty) group.getStaticProperties().get(1)).setEncrypted(false);
+    }
+
+    // remove old authentication property, add new properties for securityMode, policy and authentication options
+    staticProperties.remove(1);
+    staticProperties.add(1, securityMode);
+    staticProperties.add(2, securityPolicy);
+    staticProperties.add(3, authentication);
+
+    return staticProperties;
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
new file mode 100644
index 0000000..bc24f4d
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+
+public class OpcUaSinkMigrationV1 implements IDataSinkMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        OpcUaSink.ID,
+        SpServiceTagPrefix.DATA_SINK,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element,
+                                                     IDataSinkParameterExtractor extractor) throws RuntimeException {
+    var config = element.getStaticProperties();
+    var migratedConfigs = new OpcUaAdapterMigrationV4().migrate(config, extractor);
+    element.setStaticProperties(migratedConfigs);
+    return MigrationResult.success(element);
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
index 78babe0..265fd80 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
@@ -19,12 +19,13 @@
 package org.apache.streampipes.extensions.connectors.opcua.sink;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.PrimitiveField;
 import org.apache.streampipes.vocabulary.XSD;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -42,7 +43,8 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(OpcUa.class);
 
-  private OpcUaClient opcUaClient;
+  private ConnectedOpcUaClient connectedClient;
+  private final OpcUaConfig opcUaConfig;
   private OpcUaParameters params;
 
   private NodeId node;
@@ -73,24 +75,31 @@
     compatibleDataTypes.put(String.class, new Class[]{String.class});
   }
 
-  public void onInvocation(OpcUaParameters params) throws
+  private final OpcUaClientProvider clientProvider;
+
+  public OpcUa(OpcUaClientProvider clientProvider,
+               OpcUaParameters params) {
+    this.clientProvider = clientProvider;
+    this.params = params;
+    this.opcUaConfig = params.config();
+  }
+
+  public void onInvocation() throws
       SpRuntimeException {
 
     try {
-      this.params = params;
-      this.node = NodeId.parse(params.getSelectedNode());
-      opcUaClient = new SpOpcUaClient<>(params.getConfig()).getClient();
-      opcUaClient.connect().get();
+      this.node = NodeId.parse(params.selectedNode());
+      this.connectedClient = clientProvider.getClient(opcUaConfig);
 
     } catch (Exception e) {
-      throw new SpRuntimeException("Could not connect to OPC-UA server: " + params.getConfig().getOpcServerURL());
+      throw new SpRuntimeException("Could not connect to OPC-UA server: " + params.config().getOpcServerURL());
     }
 
     // check whether input data type and target data type are compatible
     try {
-      Variant value = opcUaClient.getAddressSpace().getVariableNode(node).readValue().getValue();
+      Variant value = this.connectedClient.getClient().getAddressSpace().getVariableNode(node).readValue().getValue();
       targetDataType = value.getValue().getClass();
-      sourceDataType = XSDMatchings.get(params.getMappingPropertyType());
+      sourceDataType = XSDMatchings.get(params.mappingPropertyType());
       if (!sourceDataType.equals(targetDataType)) {
         if (Arrays.stream(compatibleDataTypes.get(sourceDataType)).noneMatch(dt -> dt.equals(targetDataType))) {
           throw new SpRuntimeException("Data Type of event of target node are not compatible");
@@ -107,40 +116,45 @@
     Variant v = getValue(inputEvent);
 
     if (v == null) {
-      LOG.error("Mapping property type: " + this.params.getMappingPropertyType() + " is not supported");
+      LOG.error("Mapping property type: " + this.params.mappingPropertyType() + " is not supported");
     } else {
 
       DataValue value = new DataValue(v);
-      CompletableFuture<StatusCode> f = opcUaClient.writeValue(node, value);
+      CompletableFuture<StatusCode> f = this.connectedClient.getClient().writeValue(node, value);
 
       try {
         StatusCode status = f.get();
         if (status.isBad()) {
           if (status.getValue() == 0x80740000L) {
-            LOG.error("Type missmatch! Tried to write value of type: " + this.params.getMappingPropertyType()
+            LOG.error("Type missmatch! Tried to write value of type {} ", this.params.mappingPropertyType()
                 + " but server did not accept this");
           } else if (status.getValue() == 0x803B0000L) {
             LOG.error("Wrong access level. Not allowed to write to nodes");
           }
           LOG.error(
-              "Value: " + value.getValue().toString() + " could not be written to node Id: "
-                  + node.getIdentifier() + " on " + "OPC-UA server: " + params.getConfig().getOpcServerURL());
+              "Value: {} could not be written to node Id {} on OPC-UA server {}",
+              value.getValue().toString(),
+              node.getIdentifier(),
+              params.config().getOpcServerURL());
         }
       } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Exception: Value: " + value.getValue().toString() + " could not be written to node Id: "
-            + node.getIdentifier() + " on " + "OPC-UA server: " + params.getConfig().getOpcServerURL());
+        LOG.error(
+            "Exception: Value {} could not be written to node Id {} on OPC_UA server {}",
+            value.getValue().toString(),
+            node.getIdentifier(),
+            params.config().getOpcServerURL());
       }
     }
   }
 
   public void onDetach() throws SpRuntimeException {
-    opcUaClient.disconnect();
+    clientProvider.releaseClient(opcUaConfig);
   }
 
   private Variant getValue(Event inputEvent) {
     Variant result = null;
     PrimitiveField propertyPrimitive =
-        inputEvent.getFieldBySelector(this.params.getMappingPropertySelector()).getAsPrimitive();
+        inputEvent.getFieldBySelector(this.params.mappingPropertySelector()).getAsPrimitive();
 
     if (targetDataType.equals(Integer.class)) {
       result = new Variant(propertyPrimitive.getAsInt());
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
index 15906af..438bf15 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
@@ -20,36 +20,8 @@
 
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 
-public final class OpcUaParameters {
-  private final String selectedNode;
-  private final String mappingPropertySelector;
-  private final String mappingPropertyType;
-
-  private final OpcUaConfig config;
-
-  public OpcUaParameters(OpcUaConfig config,
-                         String mappingPropertySelector,
-                         String mappingPropertyType,
-                         String selectedNode) {
-    this.config = config;
-    this.mappingPropertySelector = mappingPropertySelector;
-    this.mappingPropertyType = mappingPropertyType;
-    this.selectedNode = selectedNode;
-  }
-
-  public String getSelectedNode() {
-    return selectedNode;
-  }
-
-  public String getMappingPropertySelector() {
-    return mappingPropertySelector;
-  }
-
-  public String getMappingPropertyType() {
-    return mappingPropertyType;
-  }
-
-  public OpcUaConfig getConfig() {
-    return config;
-  }
+public record OpcUaParameters(OpcUaConfig config,
+                              String mappingPropertySelector,
+                              String mappingPropertyType,
+                              String selectedNode) {
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
index 50a6069..b81f20d 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
@@ -26,6 +26,7 @@
 import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
 import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
 import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
@@ -45,11 +46,18 @@
 
 public class OpcUaSink implements IStreamPipesDataSink, SupportsRuntimeConfig {
 
+  public static final String ID = "org.apache.streampipes.sinks.databases.jvm.opcua";
+
   private OpcUa opcUa;
+  private final OpcUaClientProvider clientProvider;
+
+  public OpcUaSink(OpcUaClientProvider clientProvider) {
+    this.clientProvider = clientProvider;
+  }
 
   @Override
   public IDataSinkConfiguration declareConfig() {
-    var builder = DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua", 0)
+    var builder = DataSinkBuilder.create(ID, 0)
         .withLocales(Locales.EN)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .category(DataSinkType.FORWARD)
@@ -62,7 +70,7 @@
     SharedUserConfiguration.appendSharedOpcUaConfig(builder, false);
 
     return DataSinkConfiguration.create(
-        OpcUaSink::new,
+        () -> new OpcUaSink(clientProvider),
         builder.build()
     );
   }
@@ -89,8 +97,8 @@
         config.getSelectedNodeNames().get(0)
     );
 
-    this.opcUa = new OpcUa();
-    this.opcUa.onInvocation(params);
+    this.opcUa = new OpcUa(clientProvider, params);
+    this.opcUa.onInvocation();
   }
 
   @Override
@@ -106,6 +114,6 @@
   @Override
   public StaticProperty resolveConfiguration(String staticPropertyInternalName,
                                              IStaticPropertyExtractor extractor) throws SpConfigurationException {
-    return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+    return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, extractor);
   }
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
index aa49a7a..8cd7640 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
@@ -25,8 +25,9 @@
 import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaNodeBrowser;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+import org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
 import org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
 import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
 import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
@@ -37,7 +38,7 @@
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
@@ -78,7 +79,8 @@
    * @throws AdapterException
    * @throws ParseException
    */
-  public static GuessSchema getSchema(IAdapterParameterExtractor extractor)
+  public static GuessSchema getSchema(OpcUaClientProvider clientProvider,
+                                      IAdapterParameterExtractor extractor)
       throws AdapterException, ParseException {
     var builder = GuessSchemaBuilder.create();
     EventSchema eventSchema = new EventSchema();
@@ -86,14 +88,13 @@
     Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
     List<EventProperty> allProperties = new ArrayList<>();
 
-    SpOpcUaClient<OpcUaConfig> spOpcUaClient = new SpOpcUaClient<>(
-        SpOpcUaConfigExtractor.extractSharedConfig(extractor.getStaticPropertyExtractor(), new OpcUaConfig())
+    var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(
+        extractor.getStaticPropertyExtractor(), new OpcUaConfig()
     );
-
     try {
-      spOpcUaClient.connect();
+      var connectedClient = clientProvider.getClient(opcUaConfig);
       OpcUaNodeBrowser nodeBrowser =
-          new OpcUaNodeBrowser(spOpcUaClient.getClient(), spOpcUaClient.getSpOpcConfig());
+          new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
       List<OpcNode> selectedNodes = nodeBrowser.findNodes();
 
       if (!selectedNodes.isEmpty()) {
@@ -116,12 +117,12 @@
       var nodeIds = selectedNodes.stream()
                                  .map(OpcNode::getNodeId)
                                  .collect(Collectors.toList());
-      var response = spOpcUaClient.getClient()
+      var response = connectedClient.getClient()
                                   .readValues(0, TimestampsToReturn.Both, nodeIds);
 
       var returnValues = response.get();
 
-      spOpcUaClient.disconnect();
+      //clientProvider.releaseClient(opcUaConfig);
 
       makeEventPreview(selectedNodes, eventPreview, fieldStatusInfos, returnValues);
 
@@ -129,7 +130,9 @@
     } catch (Exception e) {
       throw new AdapterException("Could not guess schema for opc node:  " + e.getMessage(), e);
     } finally {
-      spOpcUaClient.disconnect();
+      // TODO
+      //spOpcUaClient.disconnect();
+      clientProvider.releaseClient(opcUaConfig);
     }
 
     eventSchema.setEventProperties(allProperties);
@@ -172,7 +175,8 @@
    * @param parameterExtractor to extract parameters from the OPC UA config
    * @return {@code List<Option>} with available node names for the given OPC UA configuration
    */
-  public static RuntimeResolvableTreeInputStaticProperty resolveConfig(String internalName,
+  public static RuntimeResolvableTreeInputStaticProperty resolveConfig(OpcUaClientProvider clientProvider,
+                                                                       String internalName,
                                                                        IStaticPropertyExtractor parameterExtractor)
       throws SpConfigurationException {
 
@@ -181,19 +185,18 @@
     // access mode and host/url have to be selected
     try {
       parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
-      parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
+      parameterExtractor.selectedSingleValueInternalName(SharedUserConfiguration.SECURITY_MODE, String.class);
+      parameterExtractor.selectedSingleValue(SharedUserConfiguration.SECURITY_POLICY, String.class);
     } catch (NullPointerException nullPointerException) {
       return config;
     }
 
-    SpOpcUaClient spOpcUaClient = new SpOpcUaClient(
-        SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new OpcUaConfig())
-    );
+    var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new OpcUaConfig());
 
     try {
-      spOpcUaClient.connect();
+      var connectedClient = clientProvider.getClient(opcUaConfig);
       OpcUaNodeBrowser nodeBrowser =
-          new OpcUaNodeBrowser(spOpcUaClient.getClient(), spOpcUaClient.getSpOpcConfig());
+          new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
 
       var nodes = nodeBrowser.buildNodeTreeFromOrigin(config.getNextBaseNodeToResolve());
       if (Objects.isNull(config.getNextBaseNodeToResolve())) {
@@ -204,7 +207,7 @@
 
       if (!config.getSelectedNodesInternalNames().isEmpty()) {
         config.setSelectedNodesInternalNames(
-            filterMissingNodes(spOpcUaClient.getClient(), config.getSelectedNodesInternalNames())
+            filterMissingNodes(connectedClient.getClient(), config.getSelectedNodesInternalNames())
         );
       }
 
@@ -215,13 +218,15 @@
     } catch (ExecutionException | InterruptedException | URISyntaxException e) {
       throw new SpConfigurationException("Could not connect to the OPC UA server with the provided settings", e);
     } finally {
-      if (spOpcUaClient.getClient() != null) {
-        spOpcUaClient.disconnect();
-      }
+      clientProvider.releaseClient(opcUaConfig);
+      // TODO
+//      if (spOpcUaClient.getClient() != null) {
+//        spOpcUaClient.disconnect();
+//      }
     }
   }
 
-  public static List<String> filterMissingNodes(OpcUaClient opcUaClient,
+  public static List<String> filterMissingNodes(UaClient opcUaClient,
                                                 List<String> selectedNodes) {
     return selectedNodes.stream().filter(selectedNode -> {
       try {
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
new file mode 100644
index 0000000..576f3ee
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.utils;
+
+import org.apache.streampipes.model.Tuple2;
+
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
+import java.util.List;
+
+public class SecurityUtils {
+
+  public static List<Tuple2<String, String>> getAvailableSecurityModes() {
+    return List.of(
+        new Tuple2<>("None", MessageSecurityMode.None.name()),
+        new Tuple2<>("Sign", MessageSecurityMode.Sign.name()),
+        new Tuple2<>("Sign & Encrypt", MessageSecurityMode.SignAndEncrypt.name())
+    );
+  }
+
+  public static List<SecurityPolicy> getAvailableSecurityPolicies() {
+    return List.of(
+        SecurityPolicy.None,
+        SecurityPolicy.Basic128Rsa15,
+        SecurityPolicy.Basic256,
+        SecurityPolicy.Basic256Sha256,
+        SecurityPolicy.Aes128_Sha256_RsaOaep,
+        SecurityPolicy.Aes256_Sha256_RsaPss
+    );
+  }
+}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
index 079243a..185776e 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
@@ -26,7 +26,40 @@
 
 ## Description
 
-Reads values from an OPC-UA server repeatedly
+This adapter reads node values from an OPC-UA server.
+The adapter supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode, 
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+ā”œā”€ pki/
+ā”‚  ā”œā”€ issuers/
+ā”‚  ā”œā”€ rejected/
+ā”‚  ā”œā”€ trusted/
+ā”‚  ā”‚  ā”œā”€ certs/
+ā”‚  ā”‚  ā”œā”€ crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
 
 ***
 
@@ -40,7 +73,15 @@
 
 Duration of the polling interval in seconds
 
-### Anonymous vs. Username/Password
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
 
 Choose whether you want to connect anonymously or authenticate using your credentials.
 
@@ -54,14 +95,6 @@
 &nbsp;&nbsp;&nbsp;&nbsp; **URL**: Specify the server's full `URL` (including port), can be with our without leading `opc.tcp://`<br/>
 &nbsp;&nbsp;&nbsp;&nbsp; **Host/Port**: Insert the `host` address (with or without leading `opc.tcp://`) and the `port`<br/>
 
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both valid.
-
 ### Available Nodes
 
 Shows all available nodes once namespace index and node ID are given.
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
index 995045e..15f54c5 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
@@ -38,10 +38,22 @@
 OPC_SERVER_PORT.title=Port
 OPC_SERVER_PORT.description=Example: 4840
 
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
+
 ACCESS_MODE.title=Security Level
 ACCESS_MODE.description=Select the OPC UA security level for the connection
 
-USERNAME_GROUP.title=Sign (username & password)
+USERNAME_GROUP.title=Username & Password
 USERNAME_GROUP.description=
 
 UNAUTHENTICATED.title=None
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
index 2722146..ee28512 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
@@ -26,7 +26,40 @@
 
 ## Description
 
-Allows to write events to an OPC-UA server.
+This data sink can be used to write values to an OPC-UA server.
+The sink supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode,
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+ā”œā”€ pki/
+ā”‚  ā”œā”€ issuers/
+ā”‚  ā”œā”€ rejected/
+ā”‚  ā”œā”€ trusted/
+ā”‚  ā”‚  ā”œā”€ certs/
+ā”‚  ā”‚  ā”œā”€ crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
 
 ***
 
@@ -46,6 +79,21 @@
 
 The port of the OPC-UA server.
 
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
+
+Choose whether you want to connect anonymously or authenticate using your credentials.
+
+&nbsp;&nbsp;&nbsp;&nbsp; **Anonymous**: No further information required <br/>
+&nbsp;&nbsp;&nbsp;&nbsp; **Username/Password**: Insert your `username` and `password` to access the OPC UA server
+
 ### Namespace Index
 
 The namespace index in which the node should be written
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
index 7e78d16..b26c09c 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
@@ -56,3 +56,15 @@
 
 MAPPING_PROPERY.title=Field to write
 MAPPING_PROPERY.description=The field that should be written to the OPC-UA server
+
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java b/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java
index 4dd462b..6d82bcf 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java
+++ b/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/migration/config/OpcUaAdapterVersionedConfig.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.extensions.connectors.opcua.migration.config;
 
 import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -52,7 +53,7 @@
 public class OpcUaAdapterVersionedConfig {
 
   public static AdapterDescription getOpcUaAdapterDescriptionV1(){
-    var builder = AdapterConfigurationBuilder.create(ID, 1, OpcUaAdapter::new)
+    var builder = AdapterConfigurationBuilder.create(ID, 1, () -> new OpcUaAdapter(new OpcUaClientProvider()))
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
diff --git a/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml b/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml
index b7730e2..d2aabf0 100644
--- a/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-iiot/pom.xml
@@ -198,6 +198,14 @@
             <artifactId>classindex</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk18on</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcutil-jdk18on</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.checkerframework</groupId>
             <artifactId>checker-qual</artifactId>
         </dependency>
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
index 88cede6..e487c1a 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
@@ -229,6 +229,10 @@
             <artifactId>classindex</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk18on</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.checkerframework</groupId>
             <artifactId>checker-qual</artifactId>
         </dependency>
diff --git a/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml b/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml
index b616216..5e93d4ea 100644
--- a/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-iiot-minimal/pom.xml
@@ -163,6 +163,10 @@
             <artifactId>classindex</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk18on</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.checkerframework</groupId>
             <artifactId>checker-qual</artifactId>
         </dependency>
diff --git a/ui/cypress/support/utils/connect/OpcUaUtils.ts b/ui/cypress/support/utils/connect/OpcUaUtils.ts
index 066cacf..31d3f7f 100644
--- a/ui/cypress/support/utils/connect/OpcUaUtils.ts
+++ b/ui/cypress/support/utils/connect/OpcUaUtils.ts
@@ -89,7 +89,8 @@
         }
 
         builder
-            .addInput('radio', 'access_mode-none', '')
+            .addInput('radio', 'securitymode-none', '')
+            .addInput('radio', 'userauthentication-anonymous', '')
             .addInput('radio', 'opc_host_or_url-url', '')
             .addInput(
                 'input',
diff --git a/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.spec.ts b/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.smoke.spec.ts
similarity index 97%
rename from ui/cypress/tests/connect/opcua/opcAdapterConfiguration.spec.ts
rename to ui/cypress/tests/connect/opcua/opcAdapterConfiguration.smoke.spec.ts
index 96d2656..fc5aa64 100644
--- a/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.spec.ts
+++ b/ui/cypress/tests/connect/opcua/opcAdapterConfiguration.smoke.spec.ts
@@ -160,7 +160,8 @@
             'undefined-pull-mode-group-0-PULLING_INTERVAL-0',
             '1000',
         )
-        .addInput('radio', 'access_mode-none', '')
+        .addInput('radio', 'securitymode-none', '')
+        .addInput('radio', 'userauthentication-anonymous', '')
         .addInput('radio', 'opc_host_or_url-url', '')
         .addInput(
             'input',
diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.html b/ui/src/app/connect/components/configuration-group/configuration-group.component.html
index 68569ad..282912a 100644
--- a/ui/src/app/connect/components/configuration-group/configuration-group.component.html
+++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.html
@@ -31,8 +31,10 @@
                 [adapterId]="adapterId"
                 [parentForm]="configurationGroup"
                 [fieldName]="config.internalName"
-                (updateEmitter)="triggerUpdate($event)"
-                [completedStaticProperty]="completedStaticProperty"
+                [completedConfigurations]="completedConfigurations"
+                (completedConfigurationsEmitter)="
+                    updateCompletedConfiguration($event)
+                "
             >
             </sp-app-static-property>
         </div>
diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.ts b/ui/src/app/connect/components/configuration-group/configuration-group.component.ts
index 1e109be..e87c47c 100644
--- a/ui/src/app/connect/components/configuration-group/configuration-group.component.ts
+++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.ts
@@ -16,20 +16,21 @@
  *
  */
 
-import { Component, Input } from '@angular/core';
+import { Component, Input, OnInit } from '@angular/core';
 import { UntypedFormGroup } from '@angular/forms';
 import {
     ExtensionDeploymentConfiguration,
     StaticPropertyUnion,
 } from '@streampipes/platform-services';
 import { ConfigurationInfo } from '../../model/ConfigurationInfo';
+import { StaticPropertyUtilService } from '../../../core-ui/static-properties/static-property-util.service';
 
 @Component({
     selector: 'sp-configuration-group',
     templateUrl: './configuration-group.component.html',
     styleUrls: ['./configuration-group.component.scss'],
 })
-export class ConfigurationGroupComponent {
+export class ConfigurationGroupComponent implements OnInit {
     @Input() configurationGroup: UntypedFormGroup;
 
     @Input() adapterId: string;
@@ -38,11 +39,22 @@
 
     @Input() deploymentConfiguration: ExtensionDeploymentConfiguration;
 
-    completedStaticProperty: ConfigurationInfo;
+    completedConfigurations: ConfigurationInfo[] = [];
 
-    constructor() {}
+    constructor(private staticPropertyUtils: StaticPropertyUtilService) {}
 
-    triggerUpdate(configurationInfo: ConfigurationInfo) {
-        this.completedStaticProperty = { ...configurationInfo };
+    ngOnInit() {
+        this.completedConfigurations =
+            this.staticPropertyUtils.initializeCompletedConfigurations(
+                this.configuration,
+            );
+    }
+
+    updateCompletedConfiguration(configurationInfo: ConfigurationInfo) {
+        this.staticPropertyUtils.updateCompletedConfiguration(
+            configurationInfo,
+            this.completedConfigurations,
+        );
+        this.completedConfigurations = [...this.completedConfigurations];
     }
 }
diff --git a/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts b/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts
index bc61dcd..2fc1164 100644
--- a/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts
+++ b/ui/src/app/core-ui/static-properties/base/abstract-static-property.ts
@@ -21,10 +21,11 @@
     StaticProperty,
     StaticPropertyUnion,
 } from '@streampipes/platform-services';
-import { Directive, EventEmitter, Input, Output } from '@angular/core';
+import { Directive, EventEmitter, inject, Input, Output } from '@angular/core';
 import { UntypedFormGroup } from '@angular/forms';
 import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo';
 import { InvocablePipelineElementUnion } from '../../../editor/model/editor.model';
+import { StaticPropertyUtilService } from '../static-property-util.service';
 
 @Directive()
 // eslint-disable-next-line @angular-eslint/directive-class-suffix
@@ -53,14 +54,21 @@
     @Input()
     displayRecommended: boolean;
 
-    @Output() updateEmitter: EventEmitter<ConfigurationInfo> =
+    @Input()
+    completedConfigurations: ConfigurationInfo[];
+
+    @Output()
+    completedConfigurationsEmitter: EventEmitter<ConfigurationInfo> =
         new EventEmitter();
 
+    staticPropertyUtils = inject(StaticPropertyUtilService);
+
     constructor() {}
 
-    emitUpdate(valid?: boolean) {
-        this.updateEmitter.emit(
-            new ConfigurationInfo(this.staticProperty.internalName, valid),
-        );
+    applyCompletedConfiguration(valid?: boolean) {
+        this.completedConfigurationsEmitter.emit({
+            staticPropertyInternalName: this.staticProperty.internalName,
+            configured: valid,
+        });
     }
 }
diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
index 7ae838b..a68765e 100644
--- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
+++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.html
@@ -61,6 +61,9 @@
                 <div style="padding: 0 10px">
                     <sp-app-static-property
                         [adapterId]="adapterId"
+                        [completedConfigurations]="
+                            completedAlternativeConfigurations
+                        "
                         [deploymentConfiguration]="deploymentConfiguration"
                         [eventSchemas]="eventSchemas"
                         [parentForm]="parentForm"
@@ -74,7 +77,9 @@
                         "
                         [staticProperty]="alternative.staticProperty"
                         [displayRecommended]="displayRecommended"
-                        (updateEmitter)="handleConfigurationUpdate($event)"
+                        (completedConfigurationsEmitter)="
+                            handleConfigurationUpdate($event)
+                        "
                         class="test fullWidth"
                     >
                     </sp-app-static-property>
diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts
index 6e4c668..886b67e 100644
--- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts
@@ -44,13 +44,12 @@
     @Input()
     deploymentConfiguration: ExtensionDeploymentConfiguration;
 
-    @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
+    // dependentStaticPropertyIds: Map<string, boolean> = new Map<
+    //     string,
+    //     boolean
+    // >();
 
-    completedStaticProperty: ConfigurationInfo;
-    dependentStaticPropertyIds: Map<string, boolean> = new Map<
-        string,
-        boolean
-    >();
+    completedAlternativeConfigurations: ConfigurationInfo[] = [];
 
     constructor(private changeDetectorRef: ChangeDetectorRef) {
         super();
@@ -59,10 +58,16 @@
     ngOnInit() {
         this.staticProperty.alternatives.forEach(al => {
             if (al.staticProperty) {
-                const configuration = al.staticProperty.internalName;
-                this.dependentStaticPropertyIds.set(configuration, false);
+                this.completedAlternativeConfigurations.push({
+                    staticPropertyInternalName: al.staticProperty.internalName,
+                    configured: false,
+                });
             }
         });
+        if (!this.staticProperty.alternatives.some(a => a.selected)) {
+            this.staticProperty.alternatives[0].selected = true;
+            this.checkFireCompleted(this.staticProperty.alternatives[0]);
+        }
     }
 
     radioSelectionChange(event) {
@@ -75,21 +80,23 @@
     }
 
     handleConfigurationUpdate(configurationInfo: ConfigurationInfo) {
-        this.dependentStaticPropertyIds.set(
-            configurationInfo.staticPropertyInternalName,
-            configurationInfo.configured,
+        this.staticPropertyUtils.updateCompletedConfiguration(
+            configurationInfo,
+            this.completedAlternativeConfigurations,
         );
         if (this.alternativeCompleted()) {
-            this.completedStaticProperty = { ...configurationInfo };
-            this.emitUpdate(true);
+            this.completedAlternativeConfigurations = [
+                ...this.completedAlternativeConfigurations,
+            ];
+            this.applyCompletedConfiguration(true);
         } else {
-            this.emitUpdate();
+            this.applyCompletedConfiguration(false);
         }
     }
 
     checkFireCompleted(alternative: StaticPropertyAlternative) {
         if (alternative.selected && alternative.staticProperty === null) {
-            this.emitUpdate(true);
+            this.applyCompletedConfiguration(true);
         }
     }
 
@@ -102,9 +109,11 @@
                     if (al.staticProperty === null) {
                         return false;
                     } else {
-                        return this.dependentStaticPropertyIds.get(
-                            al.staticProperty.internalName,
-                        );
+                        return this.completedAlternativeConfigurations.find(
+                            c =>
+                                c.staticPropertyInternalName ===
+                                al.staticProperty.internalName,
+                        ).configured;
                     }
                 }
             }) !== undefined
diff --git a/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts b/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts
index c28ebac..53d3de8 100644
--- a/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-any-input/static-any-input.component.ts
@@ -25,16 +25,7 @@
     templateUrl: './static-any-input.component.html',
     styleUrls: ['./static-any-input.component.scss'],
 })
-export class StaticAnyInputComponent
-    extends AbstractStaticPropertyRenderer<AnyStaticProperty>
-    implements OnInit
-{
-    @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
-    ngOnInit() {
-        this.inputEmitter.emit(true);
-    }
-
+export class StaticAnyInputComponent extends AbstractStaticPropertyRenderer<AnyStaticProperty> {
     select(elementId: string) {
         this.staticProperty.options
             .filter(option => option.elementId === elementId)
diff --git a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html
index 21df719..ab8e104 100644
--- a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html
+++ b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.html
@@ -27,7 +27,7 @@
             formControlName="{{ fieldName }}"
             [style.background]="staticProperty.selectedColor"
             required
-            (blur)="emitUpdate()"
+            (blur)="checkCompleted()"
             [cpPresetColors]="presetColors"
         />
     </div>
diff --git a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts
index c2ddd31..8717ea8 100644
--- a/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-color-picker/static-color-picker.component.ts
@@ -36,10 +36,6 @@
         super();
     }
 
-    inputValue: String;
-    hasInput: Boolean;
-    colorPickerForm: UntypedFormGroup;
-
     presetColors: any[] = [
         '#39B54A',
         '#1B1464',
@@ -58,17 +54,14 @@
         this.enableValidators();
     }
 
-    emitUpdate() {
-        this.updateEmitter.emit(
-            new ConfigurationInfo(
-                this.staticProperty.internalName,
+    checkCompleted() {
+        this.applyCompletedConfiguration(
+            this.staticPropertyUtil.asColorPickerStaticProperty(
+                this.staticProperty,
+            ).selectedColor &&
                 this.staticPropertyUtil.asColorPickerStaticProperty(
                     this.staticProperty,
-                ).selectedColor &&
-                    this.staticPropertyUtil.asColorPickerStaticProperty(
-                        this.staticProperty,
-                    ).selectedColor !== '',
-            ),
+                ).selectedColor !== '',
         );
     }
 
diff --git a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts
index 600058a..69ed38b 100644
--- a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts
@@ -38,8 +38,6 @@
     extends AbstractValidatedStaticPropertyRenderer<FileStaticProperty>
     implements OnInit
 {
-    @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
     public chooseExistingFileControl = new UntypedFormControl();
 
     dialogRef: MatDialogRef<FileRenameDialogComponent>;
@@ -99,7 +97,7 @@
                         fmi => fmi.filename === filenameToSelect,
                     );
                     this.selectOption(this.selectedFile);
-                    this.emitUpdate(true);
+                    this.applyCompletedConfiguration(true);
                     this.parentForm.controls[this.fieldName].setValue(
                         this.selectedFile,
                     );
@@ -114,7 +112,7 @@
                     if (this.fileMetadata.length > 0) {
                         this.selectedFile = this.fileMetadata[0];
                         this.selectOption(this.selectedFile);
-                        this.emitUpdate(true);
+                        this.applyCompletedConfiguration(true);
                         this.parentForm.controls[this.fieldName].setValue(
                             this.selectedFile,
                         );
@@ -172,9 +170,7 @@
         this.staticProperty.locationPath = fileMetadata.filename;
         const valid: boolean =
             fileMetadata.filename !== '' || fileMetadata.filename !== undefined;
-        this.updateEmitter.emit(
-            new ConfigurationInfo(this.staticProperty.internalName, valid),
-        );
+        this.applyCompletedConfiguration(valid);
     }
 
     displayFn(fileMetadata: FileMetadata) {
diff --git a/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts b/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts
index 78be373..e71552b 100644
--- a/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-free-input/static-free-input.component.ts
@@ -97,9 +97,7 @@
             this.staticProperty.value !== undefined &&
             this.staticProperty.value !== '' &&
             this.staticProperty.value !== null;
-        this.updateEmitter.emit(
-            new ConfigurationInfo(this.staticProperty.internalName, valid),
-        );
+        this.applyCompletedConfiguration(valid);
     }
 
     onStatusChange(status: any) {}
diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html
index fb77d38..d321f4c 100644
--- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html
+++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html
@@ -28,7 +28,7 @@
         [eventSchemas]="eventSchemas"
         [staticProperty]="property"
         [displayRecommended]="displayRecommended"
-        (updateEmitter)="handleConfigurationUpdate($event)"
+        (completedConfigurationsEmitter)="handleConfigurationUpdate($event)"
         class="test fullWidth"
     >
     </sp-app-static-property>
diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts b/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts
index 45da778..73141ea 100644
--- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts
@@ -36,8 +36,6 @@
     @Input()
     deploymentConfiguration: ExtensionDeploymentConfiguration;
 
-    @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
     dependentStaticProperties: Map<string, boolean> = new Map<
         string,
         boolean
@@ -53,9 +51,9 @@
                 v => v === true,
             )
         ) {
-            this.emitUpdate(true);
+            this.applyCompletedConfiguration(true);
         } else {
-            this.emitUpdate(false);
+            this.applyCompletedConfiguration(false);
         }
     }
 
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts
index bd1e95d..c67a1e8 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-mapping-nary/static-mapping-nary.component.ts
@@ -30,8 +30,6 @@
     extends StaticMappingComponent<MappingPropertyNary>
     implements OnInit
 {
-    @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
     constructor(private displayRecommendedPipe: DisplayRecommendedPipe) {
         super();
     }
@@ -56,7 +54,6 @@
                 }
             });
         }
-        this.inputEmitter.emit(true);
     }
 
     selectOption(property: any, $event) {
@@ -112,6 +109,6 @@
     onStatusChange(status: any) {}
 
     onValueChange(value: any) {
-        this.emitUpdate();
+        this.applyCompletedConfiguration();
     }
 }
diff --git a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
index d414502..db5cf86 100644
--- a/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-mapping-unary/static-mapping-unary.component.ts
@@ -30,8 +30,6 @@
     extends StaticMappingComponent<MappingPropertyUnary>
     implements OnInit
 {
-    @Output() inputEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
-
     constructor() {
         super();
     }
@@ -41,7 +39,7 @@
         if (!this.staticProperty.selectedProperty) {
             this.staticProperty.selectedProperty =
                 this.availableProperties[0].propertySelector;
-            this.emitUpdate(true);
+            this.applyCompletedConfiguration(true);
         }
         this.addValidator(
             this.staticProperty.selectedProperty,
@@ -54,6 +52,6 @@
 
     onValueChange(value: any) {
         this.staticProperty.selectedProperty = value;
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
     }
 }
diff --git a/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts b/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts
index 1d1b835..f05537f 100644
--- a/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-one-of-input/static-one-of-input.component.ts
@@ -52,7 +52,7 @@
             ).elementId;
         }
         this.inputEmitter.emit(true);
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
         this.parentForm.updateValueAndValidity();
     }
 
@@ -71,6 +71,6 @@
             option => option.elementId === id,
         ).selected = true;
         this.inputEmitter.emit(true);
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
     }
 }
diff --git a/ui/src/app/core-ui/static-properties/static-property-util.service.ts b/ui/src/app/core-ui/static-properties/static-property-util.service.ts
index b85882c..533cff0 100644
--- a/ui/src/app/core-ui/static-properties/static-property-util.service.ts
+++ b/ui/src/app/core-ui/static-properties/static-property-util.service.ts
@@ -37,11 +37,53 @@
     StaticPropertyGroup,
 } from '@streampipes/platform-services';
 import { IdGeneratorService } from '../../core-services/id-generator/id-generator.service';
+import { ConfigurationInfo } from '../../connect/model/ConfigurationInfo';
 
 @Injectable({ providedIn: 'root' })
 export class StaticPropertyUtilService {
     constructor(private idGeneratorService: IdGeneratorService) {}
 
+    public initializeCompletedConfigurations(
+        configs: StaticProperty[],
+    ): ConfigurationInfo[] {
+        return configs
+            .filter(config => !config.optional)
+            .map(config => {
+                return {
+                    staticPropertyInternalName: config.internalName,
+                    configured: false,
+                };
+            });
+    }
+
+    public allDependenciesSatisfied(
+        dependsOn: string[],
+        completedConfigs: ConfigurationInfo[],
+    ) {
+        if (dependsOn?.length > 0) {
+            return dependsOn.every(dependency =>
+                completedConfigs.some(
+                    config =>
+                        config.staticPropertyInternalName === dependency &&
+                        config.configured,
+                ),
+            );
+        } else {
+            return true;
+        }
+    }
+
+    public updateCompletedConfiguration(
+        completedConfig: ConfigurationInfo,
+        completedConfigs: ConfigurationInfo[],
+    ) {
+        completedConfigs.find(
+            c =>
+                c.staticPropertyInternalName ===
+                completedConfig.staticPropertyInternalName,
+        ).configured = completedConfig.configured;
+    }
+
     public clone(val: StaticProperty) {
         let clone;
         const id = this.idGeneratorService.generatePrefixedId();
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html
index 3f43c9b..fe6c2bf 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.html
+++ b/ui/src/app/core-ui/static-properties/static-property.component.html
@@ -37,78 +37,96 @@
         <div fxFlex="100">
             <sp-static-code-input
                 *ngIf="isCodeInputStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [staticProperty]="staticProperty"
                 [parentForm]="parentForm"
                 [eventSchemas]="eventSchemas"
                 [fieldName]="fieldName"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-static-code-input>
 
             <sp-app-static-secret-input
                 *ngIf="isSecretStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [staticProperty]="staticProperty"
                 [parentForm]="parentForm"
                 [fieldName]="fieldName"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-secret-input>
 
             <sp-app-static-free-input
                 *ngIf="isFreeTextStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [staticProperty]="staticProperty"
                 [parentForm]="parentForm"
                 [eventSchemas]="eventSchemas"
                 [fieldName]="fieldName"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-free-input>
 
             <sp-static-file-input
                 *ngIf="isFileStaticProperty(staticProperty)"
-                (inputEmitter)="valueChange($event)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [staticProperty]="staticProperty"
                 [parentForm]="parentForm"
                 [fieldName]="fieldName"
-                (updateEmitter)="emitUpdate($event)"
                 [adapterId]="adapterId"
             >
             </sp-static-file-input>
 
             <sp-app-static-color-picker
                 *ngIf="isColorPickerStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [staticProperty]="staticProperty"
                 [parentForm]="parentForm"
                 [fieldName]="fieldName"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-color-picker>
 
             <sp-app-static-runtime-resolvable-any-input
                 *ngIf="isRuntimeResolvableAnyStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [deploymentConfiguration]="deploymentConfiguration"
                 [staticProperty]="staticProperty"
                 [staticProperties]="staticProperties"
                 [eventSchemas]="eventSchemas"
                 [pipelineElement]="pipelineElement"
                 [parentForm]="parentForm"
-                [completedStaticProperty]="completedStaticProperty"
                 [adapterId]="adapterId"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-runtime-resolvable-any-input>
 
             <sp-app-static-runtime-resolvable-oneof-input
                 *ngIf="isRuntimeResolvableOneOfStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [deploymentConfiguration]="deploymentConfiguration"
-                [completedStaticProperty]="completedStaticProperty"
                 [pipelineElement]="pipelineElement"
                 [eventSchemas]="eventSchemas"
                 [staticProperty]="staticProperty"
                 [parentForm]="parentForm"
                 [staticProperties]="staticProperties"
                 [adapterId]="adapterId"
-                (updateEmitter)="emitUpdate($event)"
             ></sp-app-static-runtime-resolvable-oneof-input>
 
             <sp-app-static-any-input
@@ -116,7 +134,10 @@
                     isAnyStaticProperty(staticProperty) &&
                     !isRuntimeResolvableAnyStaticProperty(staticProperty)
                 "
-                (inputEmitter)="valueChange($event)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [fieldName]="fieldName"
                 [staticProperty]="staticProperty"
             >
@@ -127,8 +148,10 @@
                     isOneOfStaticProperty(staticProperty) &&
                     !isRuntimeResolvableOneOfStaticProperty(staticProperty)
                 "
-                (inputEmitter)="valueChange($event)"
-                (updateEmitter)="emitUpdate($event)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [parentForm]="parentForm"
                 [staticProperty]="staticProperty"
             >
@@ -136,30 +159,38 @@
 
             <sp-app-static-mapping-unary
                 *ngIf="isMappingPropertyUnary(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [eventSchemas]="eventSchemas"
                 [staticProperty]="staticProperty"
                 [displayRecommended]="displayRecommended"
                 [parentForm]="parentForm"
                 [fieldName]="fieldName"
-                (inputEmitter)="valueChange($event)"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-mapping-unary>
 
             <sp-app-static-mapping-nary
                 *ngIf="isMappingNaryProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [eventSchemas]="eventSchemas"
                 [parentForm]="parentForm"
                 [fieldName]="fieldName"
                 [staticProperty]="staticProperty"
                 [displayRecommended]="displayRecommended"
-                (inputEmitter)="valueChange($event)"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-mapping-nary>
 
             <sp-app-static-alternatives
                 *ngIf="isAlternativesStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [adapterId]="adapterId"
                 [deploymentConfiguration]="deploymentConfiguration"
                 [eventSchemas]="eventSchemas"
@@ -168,8 +199,6 @@
                 [staticProperties]="staticProperties"
                 [displayRecommended]="displayRecommended"
                 class="test fullWidth"
-                (inputEmitter)="valueChange($event)"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-alternatives>
 
@@ -178,6 +207,10 @@
                     isGroupStaticProperty(staticProperty) &&
                     !isRuntimeResolvableGroupStaticProperty(staticProperty)
                 "
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [adapterId]="adapterId"
                 [deploymentConfiguration]="deploymentConfiguration"
                 [eventSchemas]="eventSchemas"
@@ -187,12 +220,15 @@
                 [staticProperties]="staticProperties"
                 [displayRecommended]="displayRecommended"
                 class="test fullWidth"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-group>
 
             <sp-app-static-runtime-resolvable-group
                 *ngIf="isRuntimeResolvableGroupStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [deploymentConfiguration]="deploymentConfiguration"
                 [adapterId]="adapterId"
                 [eventSchemas]="eventSchemas"
@@ -200,15 +236,17 @@
                 [fieldName]="fieldName"
                 [staticProperties]="staticProperties"
                 [staticProperty]="staticProperty"
-                [completedStaticProperty]="completedStaticProperty"
                 [displayRecommended]="displayRecommended"
                 class="test fullWidth"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-app-static-runtime-resolvable-group>
 
             <sp-static-collection
                 *ngIf="isCollectionStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [deploymentConfiguration]="deploymentConfiguration"
                 [adapterId]="adapterId"
                 [eventSchemas]="eventSchemas"
@@ -217,27 +255,32 @@
                 [displayRecommended]="displayRecommended"
                 [staticProperty]="staticProperty"
                 class="test fullWidth"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-static-collection>
 
             <sp-static-slide-toggle
                 *ngIf="isSlideToggleStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [adapterId]="adapterId"
                 [parentForm]="parentForm"
                 [fieldName]="fieldName"
                 [displayRecommended]="displayRecommended"
                 [staticProperty]="staticProperty"
                 class="test fullWidth"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-static-slide-toggle>
             <sp-static-runtime-resolvable-tree-input
                 *ngIf="isTreeInputStaticProperty(staticProperty)"
+                (completedConfigurationsEmitter)="
+                    completedConfigurationsEmitter.emit($event)
+                "
+                [completedConfigurations]="completedConfigurations"
                 [deploymentConfiguration]="deploymentConfiguration"
                 [adapterId]="adapterId"
                 [parentForm]="parentForm"
-                [completedStaticProperty]="completedStaticProperty"
                 [pipelineElement]="pipelineElement"
                 [eventSchemas]="eventSchemas"
                 [staticProperties]="staticProperties"
@@ -245,7 +288,6 @@
                 [displayRecommended]="displayRecommended"
                 [staticProperty]="staticProperty"
                 class="test fullWidth"
-                (updateEmitter)="emitUpdate($event)"
             >
             </sp-static-runtime-resolvable-tree-input>
         </div>
diff --git a/ui/src/app/core-ui/static-properties/static-property.component.ts b/ui/src/app/core-ui/static-properties/static-property.component.ts
index ef3ddc9..5d4248d 100644
--- a/ui/src/app/core-ui/static-properties/static-property.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-property.component.ts
@@ -59,19 +59,10 @@
     @Input()
     adapterId: string;
 
-    @Output()
-    validateEmitter: EventEmitter<any> = new EventEmitter<any>();
-
-    @Output()
-    updateEmitter: EventEmitter<ConfigurationInfo> = new EventEmitter();
-
     @Input()
     eventSchemas: EventSchema[];
 
     @Input()
-    completedStaticProperty: ConfigurationInfo;
-
-    @Input()
     parentForm: UntypedFormGroup;
 
     @Input()
@@ -86,6 +77,13 @@
     @Input()
     deploymentConfiguration: ExtensionDeploymentConfiguration;
 
+    @Input()
+    completedConfigurations: ConfigurationInfo[];
+
+    @Output()
+    completedConfigurationsEmitter: EventEmitter<ConfigurationInfo> =
+        new EventEmitter();
+
     showLabel = true;
 
     @Input()
@@ -167,12 +165,4 @@
     isTreeInputStaticProperty(val) {
         return val instanceof RuntimeResolvableTreeInputStaticProperty;
     }
-
-    valueChange(hasInput) {
-        this.validateEmitter.emit();
-    }
-
-    emitUpdate(configurationInfo: ConfigurationInfo) {
-        this.updateEmitter.emit(configurationInfo);
-    }
 }
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts
index 6d84538..7b210ff 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-any-input/static-runtime-resolvable-any-input.component.ts
@@ -51,7 +51,7 @@
     selectAll(select: boolean): void {
         this.staticProperty.options.forEach(o => (o.selected = select));
         this.selectedOptions = select ? this.staticProperty.options : [];
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
     }
 
     onSelectionChange(): void {
@@ -61,7 +61,7 @@
     }
 
     checkEmitUpdate(): void {
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
     }
 
     afterOptionsLoaded(staticProperty: RuntimeResolvableAnyStaticProperty) {
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts
index 936c1c0..ec8fef5 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-group/static-runtime-resolvable-group.component.ts
@@ -52,7 +52,7 @@
         if (this.staticProperty.staticProperties.length === 0) {
             this.loadOptionsFromRestApi();
         }
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
     }
 
     afterErrorReceived() {}
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
index 19e9a9c..564f2c2 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts
@@ -47,34 +47,18 @@
     extends AbstractStaticPropertyRenderer<T>
     implements OnChanges
 {
-    @Input()
-    completedStaticProperty: ConfigurationInfo;
-
     @Input() deploymentConfiguration: ExtensionDeploymentConfiguration;
 
     showOptions = false;
     loading = false;
     error = false;
     errorMessage: SpLogMessage;
-    dependentStaticProperties: Map<string, boolean> = new Map<
-        string,
-        boolean
-    >();
 
     constructor(private runtimeResolvableService: RuntimeResolvableService) {
         super();
     }
 
-    onInit() {
-        if (
-            this.staticProperty.dependsOn &&
-            this.staticProperty.dependsOn.length > 0
-        ) {
-            this.staticProperty.dependsOn.forEach(dp => {
-                this.dependentStaticProperties.set(dp, false);
-            });
-        }
-    }
+    onInit() {}
 
     loadOptionsFromRestApi(node?: TreeInputNode) {
         const resolvableOptionsParameterRequest = new RuntimeOptionsRequest();
@@ -137,31 +121,14 @@
     }
 
     ngOnChanges(changes: SimpleChanges): void {
-        if (changes['completedStaticProperty']) {
+        if (changes['completedConfigurations']) {
             if (
-                this.completedStaticProperty !== undefined &&
-                !(
-                    this.completedStaticProperty.staticPropertyInternalName ===
-                    this.staticProperty.internalName
+                this.staticPropertyUtils.allDependenciesSatisfied(
+                    this.staticProperty.dependsOn,
+                    this.completedConfigurations,
                 )
             ) {
-                if (
-                    this.dependentStaticProperties.has(
-                        this.completedStaticProperty.staticPropertyInternalName,
-                    )
-                ) {
-                    this.dependentStaticProperties.set(
-                        this.completedStaticProperty.staticPropertyInternalName,
-                        this.completedStaticProperty.configured,
-                    );
-                }
-                if (
-                    Array.from(this.dependentStaticProperties.values()).every(
-                        v => v === true,
-                    )
-                ) {
-                    this.loadOptionsFromRestApi();
-                }
+                this.loadOptionsFromRestApi();
             }
         }
     }
diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
index 88f4dd4..eb6ffcc 100644
--- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-oneof-input/static-runtime-resolvable-oneof-input.component.ts
@@ -54,7 +54,6 @@
             this.staticProperty.options.length > 0
         ) {
             this.staticProperty.options[0].selected = true;
-            this.emitUpdate(true);
         }
     }
 
@@ -66,7 +65,7 @@
             option => option.elementId === id,
         ).selected = true;
         this.performValidation();
-        this.emitUpdate(true);
+        this.applyCompletedConfiguration(true);
     }
 
     parse(
diff --git a/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts b/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts
index 2caa19c..f085755 100644
--- a/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-secret-input/static-secret-input.component.ts
@@ -36,25 +36,19 @@
         super();
     }
 
-    @Output() updateEmitter: EventEmitter<ConfigurationInfo> =
-        new EventEmitter();
-
     ngOnInit() {
         this.addValidator(this.staticProperty.value, Validators.required);
         this.enableValidators();
     }
 
     emitUpdate() {
-        this.updateEmitter.emit(
-            new ConfigurationInfo(
-                this.staticProperty.internalName,
+        this.applyCompletedConfiguration(
+            this.staticPropertyUtil.asFreeTextStaticProperty(
+                this.staticProperty,
+            ).value &&
                 this.staticPropertyUtil.asFreeTextStaticProperty(
                     this.staticProperty,
-                ).value &&
-                    this.staticPropertyUtil.asFreeTextStaticProperty(
-                        this.staticProperty,
-                    ).value !== '',
-            ),
+                ).value !== '',
         );
     }
 
diff --git a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts
index e74274a..0b0dcce 100644
--- a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts
+++ b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts
@@ -37,9 +37,7 @@
     }
 
     emitUpdate() {
-        this.updateEmitter.emit(
-            new ConfigurationInfo(this.staticProperty.internalName, true),
-        );
+        this.applyCompletedConfiguration(true);
     }
 
     onStatusChange(status: any) {}
diff --git a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
index c7c1a4b..8823d51 100644
--- a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
+++ b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
@@ -83,7 +83,11 @@
             this.dataExplorerService.getAllPersistedDataStreams(),
             this.datalakeRestService.getAllMeasurementSeries(),
         ).subscribe(response => {
-            this.availablePipelines = response[0];
+            this.availablePipelines = response[0].filter(
+                p =>
+                    response[1].find(m => m.measureName === p.measureName) !==
+                    undefined,
+            );
             this.availableMeasurements = response[1];
 
             // replace pipeline event schemas. Reason: Available measures do not contain field for timestamp
diff --git a/ui/src/app/editor/dialog/customize/customize.component.html b/ui/src/app/editor/dialog/customize/customize.component.html
index 151d882..4bd6f29 100644
--- a/ui/src/app/editor/dialog/customize/customize.component.html
+++ b/ui/src/app/editor/dialog/customize/customize.component.html
@@ -93,12 +93,11 @@
                                     [eventSchemas]="eventSchemas"
                                     [parentForm]="parentForm"
                                     [fieldName]="config.internalName"
-                                    [completedStaticProperty]="
-                                        completedStaticProperty
+                                    [completedConfigurations]="
+                                        completedConfigurations
                                     "
-                                    (updateEmitter)="triggerUpdate($event)"
-                                    (validateEmitter)="
-                                        validConfiguration($event)
+                                    (completedConfigurationsEmitter)="
+                                        updateCompletedConfiguration($event)
                                     "
                                 >
                                 </sp-app-static-property>
diff --git a/ui/src/app/editor/dialog/customize/customize.component.ts b/ui/src/app/editor/dialog/customize/customize.component.ts
index ae2d73e..5b45b93 100644
--- a/ui/src/app/editor/dialog/customize/customize.component.ts
+++ b/ui/src/app/editor/dialog/customize/customize.component.ts
@@ -42,6 +42,7 @@
 import { ShepherdService } from '../../../services/tour/shepherd.service';
 import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo';
 import { PipelineStyleService } from '../../services/pipeline-style.service';
+import { StaticPropertyUtilService } from '../../../core-ui/static-properties/static-property-util.service';
 
 @Component({
     selector: 'sp-customize-pipeline-element',
@@ -60,13 +61,7 @@
     _showDocumentation = false;
 
     selection: any;
-    matchingSelectionLeft: any;
-    matchingSelectionRight: any;
     invalid: any;
-    helpDialogVisible: any;
-    validationErrors: any;
-
-    sourceEndpoint: any;
     sepa: any;
 
     parentForm: UntypedFormGroup;
@@ -82,6 +77,7 @@
     templateMode = false;
     template: PipelineElementTemplate;
     templateConfigs: Map<string, any>[] = [];
+    completedConfigurations: ConfigurationInfo[] = [];
 
     constructor(
         private dialogRef: DialogRef<CustomizeComponent>,
@@ -91,6 +87,7 @@
         private changeDetectorRef: ChangeDetectorRef,
         private pipelineElementTemplateService: PipelineElementTemplateService,
         private pipelineStyleService: PipelineStyleService,
+        private staticPropertyUtils: StaticPropertyUtilService,
     ) {}
 
     ngOnInit(): void {
@@ -98,6 +95,10 @@
         this.cachedPipelineElement = this.jsPlumbService.clone(
             this.pipelineElement.payload,
         ) as InvocablePipelineElementUnion;
+        this.completedConfigurations =
+            this.staticPropertyUtils.initializeCompletedConfigurations(
+                this.cachedPipelineElement.staticProperties,
+            );
         this.isDataProcessor =
             this.cachedPipelineElement instanceof DataProcessorInvocation;
         this.cachedPipelineElement.inputStreams.forEach(is => {
@@ -147,8 +148,6 @@
         this.dialogRef.close(this.pipelineElement);
     }
 
-    validConfiguration(event: any) {}
-
     set showDocumentation(value: boolean) {
         if (value) {
             this.dialogRef.changeDialogSize({ width: '90vw' });
@@ -174,6 +173,14 @@
         this.completedStaticProperty = { ...configurationInfo };
     }
 
+    updateCompletedConfiguration(configurationInfo: ConfigurationInfo) {
+        this.staticPropertyUtils.updateCompletedConfiguration(
+            configurationInfo,
+            this.completedConfigurations,
+        );
+        this.completedConfigurations = [...this.completedConfigurations];
+    }
+
     triggerTemplateMode() {
         this.template = new PipelineElementTemplate();
         this.templateMode = true;