blob: d4b0cfa592584a033ad3da3df0637897dad2b03f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import io.restassured.RestAssured;
import io.restassured.mapper.ObjectMapperType;
import io.vertx.core.http.HttpServerRequest;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToStartRouteException;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.cloudevents.CloudEvent;
import org.apache.camel.component.cloudevents.CloudEvents;
import org.apache.camel.component.knative.KnativeComponent;
import org.apache.camel.component.knative.KnativeEndpoint;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.http.base.HttpOperationFailedException;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.k.test.AvailablePortFinder;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import static io.restassured.RestAssured.config;
import static io.restassured.RestAssured.given;
import static io.restassured.config.EncoderConfig.encoderConfig;
import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent;
import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configurePlatformHttpComponent;
import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.httpAttribute;
import static org.apache.camel.component.knative.test.KnativeEnvironmentSupport.channel;
import static org.apache.camel.component.knative.test.KnativeEnvironmentSupport.endpoint;
import static org.apache.camel.component.knative.test.KnativeEnvironmentSupport.event;
import static org.apache.camel.component.knative.test.KnativeEnvironmentSupport.sourceChannel;
import static org.apache.camel.component.knative.test.KnativeEnvironmentSupport.sourceEndpoint;
import static org.apache.camel.component.knative.test.KnativeEnvironmentSupport.sourceEvent;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.is;
public class KnativeHttpTest {
private CamelContext context;
private ProducerTemplate template;
private int platformHttpPort;
private String platformHttpHost;
// **************************
//
// Setup
//
// **************************
@BeforeEach
public void before() {
this.context = new DefaultCamelContext();
this.template = this.context.createProducerTemplate();
this.platformHttpHost = "localhost";
this.platformHttpPort = AvailablePortFinder.getNextAvailable();
configurePlatformHttpComponent(context, this.platformHttpPort);
RestAssured.port = platformHttpPort;
RestAssured.config = config().encoderConfig(encoderConfig().appendDefaultContentCharsetToContentTypeIfUndefined(false));
}
@AfterEach
public void after() {
ServiceHelper.stopService(template);
if (this.context != null) {
this.context.stop();
}
}
// **************************
//
// Tests
//
// **************************
@Test
void testCreateComponent() {
context.start();
assertThat(context.getComponent("knative")).isInstanceOfSatisfying(KnativeComponent.class, c -> {
assertThat(c.getProducerFactory()).isInstanceOf(KnativeHttpProducerFactory.class);
assertThat(c.getConsumerFactory()).isInstanceOf(KnativeHttpConsumerFactory.class);
});
}
void doTestKnativeSource(CloudEvent ce, String basePath, String path) throws Exception {
KnativeComponent component = configureKnativeComponent(
context,
ce,
sourceEndpoint(
"myEndpoint",
Map.of(
Knative.SERVICE_META_PATH, ObjectHelper.supplyIfEmpty(path, () -> "/"),
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
if (ObjectHelper.isNotEmpty(basePath)) {
component.getConfiguration().addTransportOptions("basePath", basePath);
}
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/myEndpoint")
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
String targetPath = ObjectHelper.supplyIfEmpty(path, () -> "/");
if (ObjectHelper.isNotEmpty(basePath)) {
targetPath = basePath + targetPath;
}
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post(targetPath)
.then()
.statusCode(200);
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testKnativeSource(CloudEvent ce) throws Exception {
doTestKnativeSource(ce, null, null);
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testKnativeSourceWithPath(CloudEvent ce) throws Exception {
doTestKnativeSource(ce, null, "/a/path");
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testKnativeSourceWithBasePath(CloudEvent ce) throws Exception {
doTestKnativeSource(ce, "/base", null);
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testKnativeSourceWithBasePathAndPath(CloudEvent ce) throws Exception {
doTestKnativeSource(ce, "/base", "/a/path");
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testInvokeEndpoint(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"myEndpoint",
String.format("http://%s:%d/a/path", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:source")
.routeId("my-source")
.to("knative:endpoint/myEndpoint");
b.from("platform-http:/a/path")
.convertBodyTo(String.class)
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "my-source");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
template.sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testInvokeEndpointByUrl(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"myEndpoint",
null,
Map.of(
Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort),
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:source")
.routeId("my-source")
.to("knative:endpoint/myEndpoint");
b.from("platform-http:/a/path")
.convertBodyTo(String.class)
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "my-source");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
template.sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testInvokeEndpointByUrlAndPath(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"myEndpoint",
null,
Map.of(
Knative.SERVICE_META_PATH, "/with/subpath",
Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort),
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:source")
.routeId("my-source")
.to("knative:endpoint/myEndpoint");
b.from("platform-http:/a/path/with/subpath")
.convertBodyTo(String.class)
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "my-source");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
template.sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testConsumeStructuredContent(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"myEndpoint",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/myEndpoint")
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
if (Objects.equals(CloudEvents.v0_1.version(), ce.version())) {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
Map.of(
"cloudEventsVersion", ce.version(),
"eventType", "org.apache.camel.event",
"eventID", "myEventID",
"eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
"source", "/somewhere",
"contentType", "text/plain",
"data", "test"
),
ObjectMapperType.JACKSON_2
)
.when()
.post()
.then()
.statusCode(200);
} else if (Objects.equals(CloudEvents.v0_2.version(), ce.version())) {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
Map.of(
"specversion", ce.version(),
"type", "org.apache.camel.event",
"id", "myEventID",
"time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
"source", "/somewhere",
"contenttype", "text/plain",
"data", "test"
),
ObjectMapperType.JACKSON_2
)
.when()
.post()
.then()
.statusCode(200);
} else if (Objects.equals(CloudEvents.v0_3.version(), ce.version())) {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
Map.of(
"specversion", ce.version(),
"type", "org.apache.camel.event",
"id", "myEventID",
"time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
"source", "/somewhere",
"datacontenttype", "text/plain",
"data", "test"
),
ObjectMapperType.JACKSON_2
)
.when()
.post()
.then()
.statusCode(200);
} else if (Objects.equals(CloudEvents.v1_0.version(), ce.version())) {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
Map.of(
"specversion", ce.version(),
"type", "org.apache.camel.event",
"id", "myEventID",
"time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
"source", "/somewhere",
"datacontenttype", "text/plain",
"data", "test"
),
ObjectMapperType.JACKSON_2
)
.when()
.post()
.then()
.statusCode(200);
} else {
throw new IllegalArgumentException("Unknown CloudEvent spec: " + ce.version());
}
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testConsumeContent(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"myEndpoint",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/myEndpoint")
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(200);
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testConsumeContentWithFilter(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"ep1",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1"
)),
sourceEndpoint(
"ep2",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/ep1")
.convertBodyTo(String.class)
.to("log:ce1?showAll=true&multiline=true")
.to("mock:ce1");
b.from("knative:endpoint/ep2")
.convertBodyTo(String.class)
.to("log:ce2?showAll=true&multiline=true")
.to("mock:ce2");
});
context.start();
MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID1");
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE1");
mock1.expectedBodiesReceived("test");
mock1.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID2");
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE2");
mock2.expectedBodiesReceived("test");
mock2.expectedMessageCount(1);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1")
.when()
.post()
.then()
.statusCode(200);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2")
.when()
.post()
.then()
.statusCode(200);
mock1.assertIsSatisfied();
mock2.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"ep1",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[01234]"
)),
sourceEndpoint(
"ep2",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[56789]"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/ep1")
.convertBodyTo(String.class)
.to("log:ce1?showAll=true&multiline=true")
.to("mock:ce1");
b.from("knative:endpoint/ep2")
.convertBodyTo(String.class)
.to("log:ce2?showAll=true&multiline=true")
.to("mock:ce2");
});
context.start();
MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID1");
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE0");
mock1.expectedBodiesReceived("test");
mock1.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID2");
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE5");
mock2.expectedBodiesReceived("test");
mock2.expectedMessageCount(1);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE0")
.when()
.post()
.then()
.statusCode(200);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE5")
.when()
.post()
.then()
.statusCode(200);
mock1.assertIsSatisfied();
mock2.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testConsumeEventContent(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEvent("default")
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:event/event1")
.convertBodyTo(String.class)
.to("log:ce1?showAll=true&multiline=true")
.to("mock:ce1");
b.from("knative:event/event2")
.convertBodyTo(String.class)
.to("log:ce2?showAll=true&multiline=true")
.to("mock:ce2");
});
context.start();
MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "event1");
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID1");
mock1.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE1");
mock1.expectedBodiesReceived("test");
mock1.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "event2");
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID2");
mock2.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE2");
mock2.expectedBodiesReceived("test");
mock2.expectedMessageCount(1);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event1")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1")
.when()
.post()
.then()
.statusCode(200);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event2")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2")
.when()
.post()
.then()
.statusCode(200);
mock1.assertIsSatisfied();
mock2.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testReply(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"from",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event.from",
Knative.CONTENT_TYPE, "text/plain"
)),
endpoint(
Knative.EndpointKind.sink,
"to",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event.to",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/from")
.convertBodyTo(String.class)
.setBody()
.constant("consumer")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
.constant("custom");
b.from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
.to("mock:to");
});
MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event.to");
mock.expectedMessageCount(1);
context.start();
template.sendBody("direct:source", "");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testReplyCloudEventHeaders(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"from",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
endpoint(
Knative.EndpointKind.sink,
"to",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/from?replyWithCloudEvent=true")
.convertBodyTo(String.class)
.setBody()
.constant("consumer")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
.constant("custom");
b.from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
.to("mock:to");
});
MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "custom");
mock.expectedMessageCount(1);
context.start();
template.sendBody("direct:source", "");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testInvokeServiceWithoutUrl(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"test",
null,
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:endpoint/test")
.to("mock:start");
});
assertThatExceptionOfType(FailedToStartRouteException.class)
.isThrownBy(context::start)
.withCauseExactlyInstanceOf(RuntimeCamelException.class);
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"test",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:endpoint/test")
.to("mock:start");
});
context.start();
Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
assertThat(exchange.isFailed()).isTrue();
assertThat(exchange.getException()).isInstanceOf(CamelException.class);
assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking http://localhost:" + platformHttpPort);
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testRemoveConsumer(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"ep1",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
)
),
sourceEndpoint(
"ep2",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/ep1")
.routeId("r1")
.setBody().simple("${routeId}");
b.from("knative:endpoint/ep2")
.routeId("r2")
.setBody().simple("${routeId}");
});
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.setHeader("h").body()
.toF("http://localhost:%d", platformHttpPort);
});
context.start();
assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
context.getRouteController().stopRoute("r2");
assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
assertThat(e.isFailed()).isTrue();
assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
});
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testAddConsumer(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"ep1",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
)
),
sourceEndpoint(
"ep2",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/ep1")
.routeId("r1")
.setBody().simple("${routeId}");
});
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.setHeader("h").body()
.toF("http://localhost:%d", platformHttpPort);
});
context.start();
assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
assertThat(e.isFailed()).isTrue();
assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
});
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/ep2")
.routeId("r2")
.setBody().simple("${routeId}");
});
assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testInvokeEndpointWithError(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:endpoint/ep")
.to("mock:start");
b.fromF("platform-http:/")
.routeId("endpoint")
.process(e -> {
throw new RuntimeException("endpoint error");
});
});
context.start();
Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
assertThat(exchange.isFailed()).isTrue();
assertThat(exchange.getException()).isInstanceOf(CamelException.class);
assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking");
assertThat(exchange.getException()).hasMessageContaining("with statusCode: 500, statusMessage: Internal Server Error");
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testEvents(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
event(
Knative.EndpointKind.sink,
"default",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"default",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:source")
.to("knative:event/myEvent");
b.from("knative:event/myEvent")
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "myEvent");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
template.sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testEventsNoName(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
event(
Knative.EndpointKind.sink,
"default",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"default",
Map.of(
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:source")
.to("knative:event");
b.from("knative:event")
.to("mock:ce");
});
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
template.sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testEventsWithResourceRef(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
event(
Knative.EndpointKind.sink,
"default",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_OBJECT_KIND, "MyObject",
Knative.KNATIVE_OBJECT_API_VERSION, "v1",
Knative.KNATIVE_OBJECT_NAME, "myName1"
)),
sourceEvent(
"default",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_OBJECT_KIND, "MyOtherObject",
Knative.KNATIVE_OBJECT_API_VERSION, "v2",
Knative.KNATIVE_OBJECT_NAME, "myName2"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:source")
.to("knative:event/myEvent?kind=MyObject&apiVersion=v1&name=myName1");
b.from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2&name=myName2")
.to("mock:ce");
});
context.start();
assertThat(context.getEndpoint("knative:event/myEvent?kind=MyObject&apiVersion=v1&name=myName1", KnativeEndpoint.class)).satisfies(e -> {
assertThat(e.getType()).isEqualTo(Knative.Type.event);
assertThat(e.getTypeId()).isEqualTo("myEvent");
assertThat(e.getConfiguration().getTypeId()).isEqualTo("myEvent");
assertThat(e.getConfiguration().getName()).isEqualTo("myName1");
});
assertThat(context.getEndpoint("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2&name=myName2", KnativeEndpoint.class)).satisfies(e -> {
assertThat(e.getType()).isEqualTo(Knative.Type.event);
assertThat(e.getTypeId()).isEqualTo("myEvent");
assertThat(e.getConfiguration().getTypeId()).isEqualTo("myEvent");
assertThat(e.getConfiguration().getName()).isEqualTo("myName2");
});
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "myEvent");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_ID));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
template.sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testConsumeContentWithResourceRef(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"myEndpoint",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_OBJECT_KIND, "MyObject",
Knative.KNATIVE_OBJECT_API_VERSION, "v1",
Knative.KNATIVE_OBJECT_NAME, "myName1"
)),
sourceEndpoint(
"myEndpoint",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_OBJECT_KIND, "MyObject",
Knative.KNATIVE_OBJECT_API_VERSION, "v2",
Knative.KNATIVE_OBJECT_NAME, "myName2"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2&name=myName2")
.to("mock:ce");
});
context.start();
assertThat(context.getEndpoint("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2&name=myName2", KnativeEndpoint.class)).satisfies(e -> {
assertThat(e.getType()).isEqualTo(Knative.Type.endpoint);
assertThat(e.getTypeId()).isEqualTo("myEndpoint");
assertThat(e.getConfiguration().getTypeId()).isEqualTo("myEndpoint");
assertThat(e.getConfiguration().getName()).isEqualTo("myName2");
});
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce.version());
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_ID, "myEventID");
mock.expectedHeaderReceived(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "/somewhere");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(CloudEvent.CAMEL_CLOUD_EVENT_TIME));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(200);
mock.assertIsSatisfied();
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testWrongMethod(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"myEndpoint",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/myEndpoint")
.to("mock:ce");
});
context.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.when()
.get()
.then()
.statusCode(405);
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoBody(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"myEndpoint",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:endpoint/myEndpoint");
});
context.start();
Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(null));
assertThat(exchange.isFailed()).isTrue();
assertThat(exchange.getException()).isInstanceOf(IllegalArgumentException.class);
assertThat(exchange.getException()).hasMessage("body must not be null");
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoContent(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
event.response().setStatusCode(204);
event.response().end("");
});
configureKnativeComponent(
context,
ce,
channel(
Knative.EndpointKind.source,
"messages",
null,
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
channel(
Knative.EndpointKind.sink,
"messages",
String.format("http://%s:%d", platformHttpHost, platformHttpPort),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
channel(
Knative.EndpointKind.sink,
"words",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
try {
server.start();
RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/messages")
.transform().simple("transformed ${body}")
.log("${body}")
.to("knative:channel/words");
});
context.start();
Exchange exchange = template.request("knative:channel/messages", e -> e.getMessage().setBody("message"));
assertThat(exchange.getMessage().getHeaders()).containsEntry(Exchange.HTTP_RESPONSE_CODE, 204);
assertThat(exchange.getMessage().getBody()).isNull();
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoReply(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceChannel(
"channel",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/channel?reply=false")
.setBody().constant(Map.of());
});
context.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204)
.body(is(emptyOrNullString()));
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoReplyMeta(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceChannel(
"channel",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_REPLY, "false"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/channel")
.setBody().constant(Map.of());
});
context.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204)
.body(is(emptyOrNullString()));
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoReplyMetaOverride(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
sourceChannel(
"channel",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_REPLY, "true"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:channel/channel?reply=false")
.setBody().constant(Map.of());
});
context.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204)
.body(is(emptyOrNullString()));
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeaders(CloudEvent ce) throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.setHeader("CamelDummyHeader").constant("test")
.to("knative:endpoint/ep")
.to("direct:mock");
b.from("direct:mock")
.to("mock:ep");
});
context.start();
try {
MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class);
mock.expectedHeaderReceived("CamelDummyHeader", "test");
mock.expectedMessageCount(1);
server.start();
template.sendBody("direct:start", "");
mock.assertIsSatisfied();
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("org.apache.camel.event");
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isNotNull();
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersInReply(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.setHeader("CamelDummyHeader").constant("test")
.to("knative:endpoint/ep");
});
context.start();
try {
MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class);
mock.expectedHeaderReceived("CamelDummyHeader", "test");
mock.expectedMessageCount(1);
server.start();
Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody("test"));
assertThat(exchange.getMessage().getHeaders()).containsEntry("CamelDummyHeader", "test");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
final String typeHeaderVal = UUID.randomUUID().toString();
final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
final String sourceHeaderVal = UUID.randomUUID().toString();
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:endpoint/ep");
});
context.start();
try {
server.start();
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
final String typeHeaderVal = UUID.randomUUID().toString();
final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
final String sourceHeaderVal = UUID.randomUUID().toString();
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.toF("knative:endpoint/ep?%s=%s&%s=%s",
Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal);
});
context.start();
try {
server.start();
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
final String typeHeaderVal = UUID.randomUUID().toString();
final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
final String sourceHeaderVal = UUID.randomUUID().toString();
KnativeComponent component = configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
component.getConfiguration().setCeOverride(Map.of(
Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
));
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:endpoint/ep");
});
context.start();
try {
server.start();
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.routeId("my-source")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("myType")
.to("knative:endpoint/ep");
});
context.start();
try {
server.start();
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("myType");
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("my-source");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.routeId("my-source-x")
.setHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE)).constant("fromCEHeader")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader")
.to("knative:endpoint/ep");
});
context.start();
try {
server.start();
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("fromCEHeader");
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("my-source-x");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testEventBridge(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
event(
Knative.EndpointKind.sink,
"event.sink",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"event.source",
Map.of(
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:event/event.source")
.to("knative:event/event.sink");
});
context.start();
try {
server.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204);
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testDynamicEventBridge(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
event(
Knative.EndpointKind.sink,
"default",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"event.source",
Map.of(
Knative.CONTENT_TYPE, "text/plain"
))
);
RouteBuilder.addRoutes(context, b -> {
b.from("knative:event/event.source")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("event.sink")
.to("knative:event");
});
context.start();
try {
server.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204);
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testEventDefaultType(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
ce,
event(
Knative.EndpointKind.sink,
"default",
String.format("http://%s:%d", server.getHost(), server.getPort()),
Map.of(
Knative.CONTENT_TYPE, "text/plain"
)
)
);
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.to("knative:event");
});
context.start();
try {
server.start();
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("org.apache.camel.event");
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isNotNull();
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testSlowConsumer(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
event.vertx().executeBlocking(
promise -> {
try {
Thread.sleep(5000);
promise.complete();
} catch (InterruptedException e) {
promise.fail(e);
}
},
false,
result -> {
event.response().setStatusCode(200);
event.response().end("");
}
);
});
configureKnativeComponent(
context,
ce,
sourceEndpoint(
"start",
Map.of(
Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
);
try {
server.start();
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/start")
.removeHeaders("Camel*")
.toF("http://%s:%d", server.getHost(), server.getPort());
});
context.start();
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
.header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(200);
} finally {
server.stop();
}
}
}