blob: 70cc921be324791d25e528224d31bd57c0e8d250 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.ce;
import java.io.InputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.cloudevents.CloudEvent;
import org.apache.camel.component.knative.KnativeEndpoint;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
private final CloudEvent cloudEvent;
protected AbstractCloudEventProcessor(CloudEvent cloudEvent) {
this.cloudEvent = cloudEvent;
}
@Override
public CloudEvent cloudEvent() {
return cloudEvent;
}
@SuppressWarnings("unchecked")
@Override
public Processor consumer(KnativeEndpoint endpoint, KnativeResource service) {
return exchange -> {
if (Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_BATCH_CONTENT_MODE)) {
throw new UnsupportedOperationException("Batched CloudEvents are not yet supported");
}
if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) {
final Map<String, Object> headers = exchange.getIn().getHeaders();
for (CloudEvent.Attribute attribute: cloudEvent.attributes()) {
Object val = headers.remove(attribute.http());
if (val != null) {
headers.put(attribute.id(), val);
}
}
} else {
try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class));
}
}
};
}
protected abstract void decodeStructuredContent(Exchange exchange, Map<String, Object> content);
@Override
public Processor producer(KnativeEndpoint endpoint, KnativeResource service) {
final CloudEvent ce = cloudEvent();
final Logger logger = LoggerFactory.getLogger(getClass());
final String contentType = service.getContentType();
return exchange -> {
final Map<String, Object> headers = exchange.getMessage().getHeaders();
for (CloudEvent.Attribute attribute: ce.attributes()) {
Object value = headers.get(attribute.id());
if (value != null) {
headers.putIfAbsent(attribute.http(), value);
}
}
if (contentType != null) {
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
}
//
// in case of events, if the type of the event is defined as URI param so we need
// to override it to avoid the event type be overridden by Messages's headers
//
if (endpoint.getType() == Knative.Type.event && endpoint.getTypeId() != null) {
final Object eventType = headers.get(CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
if (eventType != null) {
logger.debug("Detected the presence of {} header with value {}: it will be ignored and replaced by value set as uri parameter {}",
CloudEvent.CAMEL_CLOUD_EVENT_TYPE,
eventType,
endpoint.getTypeId());
}
headers.put(cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), endpoint.getTypeId());
} else {
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () -> {
String eventType = service.getCloudEventType();
if (eventType == null) {
eventType = endpoint.getConfiguration().getCloudEventsType();
}
return eventType;
});
}
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_ID, exchange::getExchangeId);
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, exchange::getFromRouteId);
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce::version);
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TIME, () -> {
final ZonedDateTime created = ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
return eventTime;
});
headers.putAll(service.getCeOverrides());
};
}
protected void setCloudEventHeader(Map<String, Object> headers, String id, Supplier<Object> supplier) {
headers.putIfAbsent(cloudEvent().mandatoryAttribute(id).http(), supplier.get());
}
}