| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.kafkaconnector; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.camel.Exchange; |
| import org.apache.camel.ProducerTemplate; |
| import org.apache.camel.kafkaconnector.utils.CamelMainSupport; |
| import org.apache.camel.kafkaconnector.utils.TaskHelper; |
| import org.apache.camel.support.DefaultExchange; |
| import org.apache.kafka.connect.data.Schema; |
| import org.apache.kafka.connect.data.SchemaBuilder; |
| import org.apache.kafka.connect.errors.ConnectException; |
| import org.apache.kafka.connect.header.Header; |
| import org.apache.kafka.connect.sink.SinkRecord; |
| import org.apache.kafka.connect.sink.SinkTask; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class CamelSinkTask extends SinkTask { |
| public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key"; |
| private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX = "camel.sink.endpoint."; |
| private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX = "camel.sink.path."; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class); |
| |
| private static final String LOCAL_URL = "direct:start"; |
| private static final String HEADER_CAMEL_PREFIX = "CamelHeader"; |
| private static final String PROPERTY_CAMEL_PREFIX = "CamelProperty"; |
| |
| private CamelMainSupport cms; |
| private ProducerTemplate producer; |
| private CamelSinkConnectorConfig config; |
| |
| @Override |
| public String version() { |
| return new CamelSinkConnector().version(); |
| } |
| |
| @Override |
| public void start(Map<String, String> props) { |
| try { |
| LOG.info("Starting CamelSinkTask connector task"); |
| Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props); |
| config = getCamelSinkConnectorConfig(actualProps); |
| |
| String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); |
| final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF); |
| |
| if (remoteUrl == null) { |
| remoteUrl = TaskHelper.buildUrl(actualProps, config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF), CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX); |
| } |
| |
| cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null); |
| |
| producer = cms.createProducerTemplate(); |
| |
| cms.start(); |
| LOG.info("CamelSinkTask connector task started"); |
| } catch (Exception e) { |
| throw new ConnectException("Failed to create and start Camel context", e); |
| } |
| } |
| |
| protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(Map<String, String> props) { |
| return new CamelSinkConnectorConfig(props); |
| } |
| |
| protected Map<String, String> getDefaultConfig() { |
| return Collections.EMPTY_MAP; |
| } |
| |
| protected static String getCamelSinkEndpointConfigPrefix() { |
| return CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX; |
| } |
| |
| protected static String getCamelSinkPathConfigPrefix() { |
| return CAMEL_SINK_PATH_PROPERTIES_PREFIX; |
| } |
| |
| @Override |
| public void put(Collection<SinkRecord> sinkRecords) { |
| Map<String, Object> headers = new HashMap<String, Object>(); |
| Exchange exchange = new DefaultExchange(producer.getCamelContext()); |
| for (SinkRecord record : sinkRecords) { |
| headers.put(KAFKA_RECORD_KEY_HEADER, record.key()); |
| for (Iterator<Header> iterator = record.headers().iterator(); iterator.hasNext();) { |
| Header header = (Header)iterator.next(); |
| if (header.key().startsWith(HEADER_CAMEL_PREFIX)) { |
| addHeader(headers, header); |
| } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) { |
| addProperty(exchange, header); |
| } |
| } |
| exchange.getMessage().setHeaders(headers); |
| exchange.getMessage().setBody(record.value()); |
| LOG.debug("Sending {} to {}", exchange, LOCAL_URL); |
| producer.send(LOCAL_URL, exchange); |
| } |
| } |
| |
| @Override |
| public void stop() { |
| LOG.info("Stopping CamelSinkTask connector task"); |
| try { |
| cms.stop(); |
| } catch (Exception e) { |
| throw new ConnectException("Failed to stop Camel context", e); |
| } finally { |
| LOG.info("CamelSinkTask connector task stopped"); |
| } |
| } |
| |
| private void addHeader(Map<String, Object> map, Header singleHeader) { |
| Schema schema = singleHeader.schema(); |
| if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (String)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (Boolean)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (byte[])singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (float)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (double)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (short)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (long)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) { |
| map.put(singleHeader.key(), (byte)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) { |
| map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) { |
| map.put(singleHeader.key(), (List<?>)singleHeader.value()); |
| } |
| } |
| |
| private void addProperty(Exchange exchange, Header singleHeader) { |
| Schema schema = singleHeader.schema(); |
| if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (String)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (Boolean)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (byte[])singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (float)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (double)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (short)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (long)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (byte)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (Map<?, ?>)singleHeader.value()); |
| } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) { |
| exchange.getProperties().put(singleHeader.key(), (List<?>)singleHeader.value()); |
| } |
| } |
| |
| public CamelMainSupport getCms() { |
| return cms; |
| } |
| } |