Amazon Managed Streaming for Apache Kafka (AWS MSK) is a fully managed Apache Kafka service provided by AWS. Doris supports importing data from AWS MSK in real time through Routine Load, providing the IAM authentication mechanism for AWS MSK. It supports CSV and JSON formats and offers Exactly-Once semantics, ensuring that data is neither lost nor duplicated. For more information, refer to Routine Load.
Before using Routine Load to import data from AWS MSK, confirm that the following conditions are met:
The following table lists the authentication-related parameters that need to be configured when importing data from AWS MSK:
| Parameter | Description | Example |
|---|---|---|
aws.region | AWS Region | "us-east-1" |
aws.access_key | AWS Access Key ID | - |
aws.secret_key | AWS Secret Access Key | - |
aws.role_arn | Role used for cross-account access credentials | "arn:aws:iam::123456789012:role/MyRole" |
aws.profile_name | AWS Profile name configured in ~/.aws/credentials | - |
aws.credentials_provider | Standard credentials provider of the AWS SDK. Supports various provider types | "INSTANCE_PROFILE" |
aws.external_id | Used as the “calling context identifier” for AssumeRole | - |
property.security.protocol | Due to IAM authentication restrictions, this is fixed to SASL_SSL | "SASL_SSL" |
property.sasl.mechanism | Due to librdkafka library restrictions, this is fixed to OAUTHBEARER | "OAUTHBEARER" |
aws.credentials_provider| Value | Description |
|---|---|
DEFAULT | Use the default provider chain |
ENV | Read credentials from environment variables |
INSTANCE_PROFILE | Use EC2 Instance Profile credentials |
Doris supports the following methods for IAM authentication. Choose one according to your actual deployment scenario:
| Authentication method | Applicable scenario |
|---|---|
| Use AK/SK directly | You already have a long-term valid Access Key/Secret Key |
| IAM Role (Assume Role) | Cross-account access, or when temporary credentials are preferred |
Specify the credential source through aws.credentials_provider | When you do not want to explicitly fill in AK/SK, for example with EC2 Instance Profile |
CREATE ROUTINE LOAD IAM_Test ON t COLUMNS TERMINATED BY ",", COLUMNS(a,b) FROM KAFKA( "kafka_broker_list" = "your_msk_broker_list", "kafka_topic" = "your_kafka_topic", "aws.region" = "us-west-1", "aws.access_key" = "<your-ak>", "aws.secret_key" = "<your-sk>", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.security.protocol" = "SASL_SSL", "property.sasl.mechanism" = "OAUTHBEARER" );
When aws.role_arn is configured, aws.credentials_provider is used to specify the source credential provider used by the STS AssumeRole call.
CREATE ROUTINE LOAD IAM_Test ON t COLUMNS TERMINATED BY ",", COLUMNS(a,b) FROM KAFKA( "kafka_broker_list" = "your_msk_broker_list", "kafka_topic" = "your_kafka_topic", "aws.region" = "us-west-1", "aws.role_arn" = "arn:aws:iam::123456789012:role/demo-role", "aws.credentials_provider" = "INSTANCE_PROFILE", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.security.protocol" = "SASL_SSL", "property.sasl.mechanism" = "OAUTHBEARER" );
CREATE ROUTINE LOAD IAM_Test ON t COLUMNS TERMINATED BY ",", COLUMNS(a,b) FROM KAFKA( "kafka_broker_list" = "your_msk_broker_list", "kafka_topic" = "your_kafka_topic", "aws.region" = "us-west-1", "aws.role_arn" = "arn:aws:iam::123456789012:role/demo-role", "aws.credentials_provider" = "ENV", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.security.protocol" = "SASL_SSL", "property.sasl.mechanism" = "OAUTHBEARER" );
CREATE ROUTINE LOAD IAM_Test ON t COLUMNS TERMINATED BY ",", COLUMNS(a,b) FROM KAFKA( "kafka_broker_list" = "your_msk_broker_list", "kafka_topic" = "your_kafka_topic", "aws.region" = "us-west-1", "aws.role_arn" = "arn:aws:iam::123456789012:role/demo-role", "aws.credentials_provider" = "DEFAULT", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.security.protocol" = "SASL_SSL", "property.sasl.mechanism" = "OAUTHBEARER" );
aws.credentials_providerThis is suitable for scenarios where AK/SK is not explicitly provided, such as EC2 Instance Profile.
CREATE ROUTINE LOAD IAM_Test ON t COLUMNS TERMINATED BY ",", COLUMNS(a,b) FROM KAFKA( "kafka_broker_list" = "your_msk_broker_list", "kafka_topic" = "your_kafka_topic", "aws.region" = "us-west-1", "aws.credentials_provider" = "INSTANCE_PROFILE", "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.security.protocol" = "SASL_SSL", "property.sasl.mechanism" = "OAUTHBEARER" );
When multiple credential parameters are configured at the same time, they take effect in the following order of priority:
aws.access_key and aws.secret_key are configured, AK/SK is used first.aws.role_arn is configured, the IAM Role is used. In this case, aws.credentials_provider is used to select the STS source credential.aws.role_arn is configured, aws.credentials_provider directly determines the provider used by the AWS client.For users who want to access AWS MSK from a public network environment, if AWS authentication issues occur during data import, troubleshoot using the following steps:
In the AWS MSK console, select the cluster you want to access, and check Networking settings under Properties by going to Edit public access. Make sure that the public access option is turned on.
The subnet associated with the cluster must be public. In the AWS VPC console, ensure that the route table entries of the subnet contain the 0.0.0.0/0 : igw-xxxx entry.
In the AWS MSK console, select the cluster you want to access, click View client information, and ensure that the kafka_broker_list property used when creating the Routine Load is filled in with the public endpoint rather than the private endpoint.
Check the inbound rules of the security group configured for MSK to see whether an appropriate source IP is configured for port 9198.
Note: If you communicate with the broker through IAM access control, you need to expose access through port 9198.
For more detailed information, refer to the related AWS documents:
property.security.protocol and property.sasl.mechanism be fixed values?Due to the AWS MSK IAM authentication mechanism and the limitations of the underlying librdkafka library, these two parameters must be fixed to SASL_SSL and OAUTHBEARER respectively. Otherwise, the IAM authentication handshake cannot be completed successfully.
aws.role_arn are configured at the same time, which credential is used?AK/SK is used first. For details, see Credential Resolution Rules.
Follow the four steps in Public Network Access Troubleshooting and check each item: whether public access is enabled, whether the subnet is public, whether the Bootstrap endpoint uses the public endpoint, and whether the security group allows port 9198.
You can bind an IAM Role with MSK access permissions to the EC2 instance, and then set aws.credentials_provider to INSTANCE_PROFILE. Refer to Method 3.
Yes. Routine Load provides Exactly-Once semantics, ensuring that data is neither lost nor duplicated.