blob: 3e980839d6326b16a3075400bc172877533208a5 [file] [log] [blame]
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
name: kafka-azure-schema-registry-sink
annotations:
camel.apache.org/kamelet.support.level: "Stable"
camel.apache.org/catalog.version: "4.6.0-SNAPSHOT"
camel.apache.org/kamelet.icon: ""
camel.apache.org/provider: "Apache Software Foundation"
camel.apache.org/kamelet.group: "Kafka"
camel.apache.org/kamelet.namespace: "Kafka"
labels:
camel.apache.org/kamelet.type: "sink"
spec:
definition:
title: "Azure Kafka through Eventhubs with Azure Schema Registry Sink"
description: |-
Send data to Kafka topics on Azure Eventhubs combined with Azure Schema Registry.
The Kamelet is able to understand the following headers to be set:
- `key` / `ce-key`: as message key
- `partition-key` / `ce-partitionkey`: as message partition key
Both the headers are optional.
required:
- topic
- bootstrapServers
- azureRegistryUrl
- password
type: object
properties:
topic:
title: Topic Names
description: Comma separated list of Kafka topic names
type: string
bootstrapServers:
title: Bootstrap Servers
description: Comma separated list of Kafka Broker URLs
type: string
securityProtocol:
title: Security Protocol
description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported
type: string
default: SASL_SSL
saslMechanism:
title: SASL Mechanism
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
type: string
default: PLAIN
password:
title: Password
description: Password to authenticate to kafka
type: string
format: password
x-descriptors:
- urn:camel:group:credentials
valueSerializer:
title: Value Deserializer
description: Deserializer class for value that implements the Deserializer interface.
type: string
default: "com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer"
azureRegistryUrl:
title: Azure Schema Registry URL
description: The Apicurio Schema Registry URL
type: string
x-descriptors:
- urn:keda:metadata:bootstrapServers
- urn:keda:required
specificAvroValueType:
title: Specific Avro Value Type
description: The Specific Type Avro will have to deal with
type: string
example: "com.example.Order"
dependencies:
- "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.6.0-SNAPSHOT"
- "camel:core"
- "camel:kafka"
- "camel:kamelet"
- "camel:azure-schema-registry"
- "mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1"
- "mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.14"
- "mvn:com.azure:azure-identity:1.11.2"
template:
beans:
- name: defaultAzureCredential
type: "#class:org.apache.camel.component.azure.schema.registry.DefaultAzureCredentialWrapper"
from:
uri: "kamelet:source"
steps:
- choice:
when:
- simple: "${header[key]}"
steps:
- setHeader:
name: kafka.KEY
simple: "${header[key]}"
- simple: "${header[ce-key]}"
steps:
- setHeader:
name: kafka.KEY
simple: "${header[ce-key]}"
- choice:
when:
- simple: "${header[partition-key]}"
steps:
- setHeader:
name: kafka.PARTITION_KEY
simple: "${header[partition-key]}"
- simple: "${header[ce-partitionkey]}"
steps:
- setHeader:
name: kafka.PARTITION_KEY
simple: "${header[ce-partitionkey]}"
- to:
uri: "kafka:{{topic}}"
parameters:
brokers: "{{bootstrapServers}}"
securityProtocol: "{{securityProtocol}}"
saslMechanism: "{{saslMechanism}}"
saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{password}};'
valueSerializer: "{{valueSerializer}}"
additionalProperties.schema.registry.url: "{{azureRegistryUrl}}"
additionalProperties.schema.group: avro
additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):{{specificAvroValueType}}'