PIP 289: Secure Pulsar Connector Configuration

Background knowledge

Pulsar Sinks and Sources (a.k.a. Connectors) allow you to move data from a remote system into and out of a Pulsar cluster. These remote systems often require authentication, which requires secret management.

The current state of Pulsar Connector secret management is fragmented, is not documented in the “Pulsar IO” docs, and is not possible in certain cases. This PIP aims to address these issues through several changes.

The easiest way to show the current short comings is by way of example.

Elasticsearch Example

Here is the current way to deploy an Elasticsearch Sink without the use of plaintext secrets:

$ bin/pulsar-admin sinks create \
    --tenant public \
    --namespace default \
    --sink-type elastic_search \
    --name elasticsearch-test-sink \
    --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index"}' \
    --secrets '{"username": {"MY-K8S-SECRET-USERNAME": "secret-name"},"password": {"MY-K8S-SECRET-PASSWORD": "password123"}}'
    --inputs elasticsearch_test

When run targetting Kubernetes, the above works by mounting secrets MY-K8S-SECRET-USERNAME and MY-K8S-SECRET-PASSWORD into the sink pod container as environment variables:

username=secret-name
password=password123

Those environment variables are then injected into the config when it is loaded at runtime based on annotations on the ElasticSearchConfig.

Problem

The annotation approach, which is the only way to inject secrets into connectors, requires that all secret fields are annotated with sensitive = true and that all secret fields are at the top level of their configuration class. However, the Elasticsearch config contains an ssl field that has nested secrets. See:

{
      "elasticSearchUrl": "http://localhost:9200",
      "indexName": "my_index",
      "username": "username",
      "password": "password",
      "ssl": {
        "enabled": true,
        "truststorePath": "/pulsar/security/truststore.jks",
        "truststorePassword": "truststorepass",
        "keystorePath": "/pulsar/security/keystore.jks",
        "keystorePassword": "keystorepass"
      }
}

Because truststorePassword and keystorePassword are not at the top level, we do not currently have a secure way (i.e. non-plaintext) to configure those settings.

RabbitMQ Example

Another relevant example shows how the Pulsar code base has not consistently implemented secret management for connectors. For the RabbitMQ Sink, the sensitive fields are annotated correctly, but the configuration is not loaded via the IOConfigUtils#loadWithSecrets method, which means the only way to load rabbit secrets is as plaintext values in the config.

Kafka Connect Adapter Example

The final relevant example is the Kafka Connect Adapter. This adapter allows you to run Kafka Connectors in Pulsar Connectors. Because of the recursive nature of these connectors, the configuration for the wrapped connector is stored in a map named kafkaConnectorConfigProperties. Because this field is an arbitrary map, we cannot rely on the Pulsar sensitive annotation flag to determine whether to load the secret when building the config class.

Motivation

Increase Pulsar Function security by giving users a way to configure Pulsar Connectors with non-plaintext secrets.

The recent CVE-2023-37579 resulted in the potential to leak connector configurations. Because we do not always provide a way to configure connector configuration in the connector's secrets map, leaking the configuration meant leaking secrets.

Goals

In Scope

  • Provide users with a secure way to configure official Pulsar Connectors as well as third party connectors.
  • Improve documentation to reflect the current state of secrets management in Pulsar Connectors.
  • Only sinks and sources will benefit from this change.
  • Only the JavaInstanceRunnable class will benefit from this change.

Out of Scope

  • This PIP will not prevent users from configuring secrets via insecure methods, such as plaintext configuration.
  • Functions are out of scope because they do not need arbitrary secret injection. Functions can already access secrets through the Context#getSecret method.
  • Python and Go Function Runtimes--sinks and sources are not typically written in these languages.

High Level Design

  • Add a new secrets injection mechanism which allows for arbitrary secret injection into the connector configuration at runtime.
  • Update existing, official connectors to properly use the already available secret injection mechanism.
  • Fix the documentation for the existing secrets management methods.

Detailed Design

Design & Implementation Details

In order to add a new way to inject, or interpolate, secrets, we need to add a new method to the SecretsProvider interface, which can be implemented by users, but is not exposed to function/connector runtimes. This new method will be used to first determine if a secret should be interpolated for a given value, and if so, return the interpolated value. If the value is not a secret, or the secret does not exist, the method will return null and no interpolation will occur. The notable difference for this method is that it does not have a “path” to the secret. Therefore, the existing secrets map might not apply for certain use cases. In the environment variable scenario, this is a natural fit because the value can be interpreted as the name of the environment variable. For usage of the new configuration mechanism, see the cli section.

In the event of a value collision between the old way and this new way to inject secrets, the old way will take precedence.

In order to add support for the existing sensitive annotation, I propose fixing all the connectors that have explicit secrets in their configurations.

Fixing the documentation will be a matter of updating the existing documentation to reflect the current state of the code.

Public-facing Changes

Public API

Add new method to SecretsProvider Interface

Add the following method to the SecretsProvider interface:

interface SecretsProvider {
    /**
     * If the passed value is formatted as a reference to a secret, as defined by the implementation, return the
     * referenced secret. If the value is not formatted as a secret reference or the referenced secret does not exist,
     * return null.
     *
     * @param value a config value that may be formatted as a reference to a secret
     * @return the materialized secret. Otherwise, null.
     */
    default String interpolateSecretForValue(String value) {
        return null;
    }
}

There are only two official implementations of the SecretProvider interface. The ClearTextSecretsProvider and the EnvironmentBasedSecretsProvider. Given that the ClearTextSecretsProvider is only plaintext, it will not override the new method. Here is the proposed implementation for the EnvironmentBasedSecretsProvider:

public class EnvironmentBasedSecretsProvider implements SecretsProvider {
    /**
     * Pattern to match ${secretName} in the value.
     */
    private static final Pattern interpolationPattern = Pattern.compile("\\$\\{(.+?)}");

    @Override
    public String interpolateSecretForValue(String value) {
        Matcher m = interpolationPattern.matcher(value);
        if (m.matches()) {
            String secretName = m.group(1);
            // If the secret doesn't exist, we return null and don't override the current value.
            return provideSecret(secretName, null);
        }
        return null;
    }
}

Binary protocol

No change.

Configuration

There is no new configuration for this change. It is always enabled.

CLI

  • Here is the new way that users will map secrets into nested configs:

    $ bin/pulsar-admin sinks create \
        --tenant public \
        --namespace default \
        --sink-type elastic_search \
        --name elasticsearch-test-sink \
        --sink-config '{
          "elasticSearchUrl": "http://localhost:9200",
          "indexName": "my_index",
          "username": "${username}",
          "password": "${password}",
          "ssl": {
            "enabled": true,
            "truststorePath": "/pulsar/security/truststore.jks",
            "truststorePassword": "${truststorepass}",
            "keystorePath": "/pulsar/security/keystore.jks",
            "keystorePassword": "${keystorePassword}"
        }' \
        --secrets '{"username": {"MY-K8S-SECRET-USERNAME": "secret-name"},"password": {"MY-K8S-SECRET-PASSWORD": "password123"},"keystorePassword": {"MY-K8S-KEYSTORE-PASS": "xyz"},"truststorepass": {"MY-K8S-TRUSTSTORE-PASS": "abc"}}'
        --inputs elasticsearch_test
    

Metrics

No new metrics are added by this change.

Monitoring

Not applicable.

Security Considerations

The primary security consideration is whether there is any risk in giving users a way to interpolate environment variables into their connector. This change only affects the EnvironmentBasedSecretsProvider, which is only used by the Kubernetes Function runtime. As such, there are no environment variables to leak. Further, all connectors have access to their environment variables, so no additional risk is present.

Backward & Forward Compatibility

Revert

Reverting this change is as simple as downgrading the function worker and stopping then starting the function.

Upgrade

Upgrade by upgrading the function worker and stopping then starting the function. Also, the user will need to update their connector configuration to use the new syntax.

Alternatives

While exploring this PIP, I considered several alternatives.

Merge Secret Map into Config Map

Attempt to merge all secrets configured for the connector into the connector's configuration. See https://github.com/apache/pulsar/pull/20863 for an example of this approach.

The primary issue with this design is the fact that the secrets map configured for a connector is of type Map<String, Object> where the keys are meant to be top level fields in the connector configuration and the values are paths to the secrets. As such, we cannot use the secrets map to recursively inject secrets into the config, which is a requirement for some connectors.

Directly Inject Secrets into Config Map Based on Value Prefix

We could consider interpreting configuration values that start with a well known prefix, like env:, as values that need to be read from the environment. The primary drawback to this solution is that there is not an easy way to configure the function at this point in the code, which means that it is always on.

This solution would look something like adding this code block

    // Replace environment variable pointers with their environment variable values
    for (Map.Entry<String, Object> entry : config.entrySet()) {
        if (entry.getValue() instanceof String && ((String) entry.getValue()).toLowerCase().startsWith("env:")) {
            String envVariableName = ((String) entry.getValue()).substring("env:".length());
            String envVariableValue = System.getenv(envVariableName);
            entry.setValue(envVariableValue);
        }
    }

to this method: https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L884-L929.

General Notes

Links