| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.salesforce; |
| |
| import java.time.ZonedDateTime; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Objects; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import org.apache.camel.AsyncProcessor; |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.component.salesforce.api.dto.PlatformEvent; |
| import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper; |
| import org.apache.camel.spi.ClassResolver; |
| import org.cometd.bayeux.Message; |
| import org.cometd.bayeux.client.ClientSessionChannel; |
| import org.cometd.common.HashMapMessage; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.Mock; |
| import org.mockito.junit.MockitoJUnitRunner; |
| |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.ArgumentMatchers.same; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.verifyNoMoreInteractions; |
| import static org.mockito.Mockito.when; |
| |
| @RunWith(MockitoJUnitRunner.class) |
| public class SalesforceConsumerTest { |
| |
| public static class AccountUpdates { |
| @JsonProperty("Id") |
| String id; |
| |
| @JsonProperty("Name") |
| String name; |
| |
| @JsonProperty("Phone") |
| String phone; |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (!(obj instanceof AccountUpdates)) { |
| return false; |
| } |
| |
| final AccountUpdates other = (AccountUpdates)obj; |
| |
| return Objects.equals(id, other.id) && Objects.equals(name, other.name) && Objects.equals(phone, other.phone); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(id, name, phone); |
| } |
| |
| } |
| |
| static final SubscriptionHelper NOT_USED = null; |
| |
| SalesforceEndpointConfig configuration = new SalesforceEndpointConfig(); |
| SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class); |
| Exchange exchange = mock(Exchange.class); |
| org.apache.camel.Message in = mock(org.apache.camel.Message.class); |
| AsyncProcessor processor = mock(AsyncProcessor.class); |
| Message pushTopicMessage; |
| |
| @Mock |
| private Message mockChangeEvent; |
| @Mock |
| private Map<String, Object> mockChangeEventPayload; |
| @Mock |
| private Map<String, Object> mockChangeEventData; |
| @Mock |
| private Map<String, Object> mockChangeEventMap; |
| |
| @Before |
| public void setupMocks() { |
| when(endpoint.getConfiguration()).thenReturn(configuration); |
| when(endpoint.createExchange()).thenReturn(exchange); |
| when(exchange.getIn()).thenReturn(in); |
| final SalesforceComponent component = mock(SalesforceComponent.class); |
| when(endpoint.getComponent()).thenReturn(component); |
| final CamelContext camelContext = mock(CamelContext.class); |
| when(component.getCamelContext()).thenReturn(camelContext); |
| final ClassResolver classResolver = mock(ClassResolver.class); |
| when(camelContext.getClassResolver()).thenReturn(classResolver); |
| when(classResolver.resolveClass(AccountUpdates.class.getName())).thenReturn((Class)AccountUpdates.class); |
| |
| pushTopicMessage = createPushTopicMessage(); |
| |
| setupMockChangeEvent(); |
| } |
| |
| private void setupMockChangeEvent() { |
| final Map<String, Object> changeEventHeader = new HashMap<>(); |
| changeEventHeader.put("changeType", "CREATE"); |
| changeEventHeader.put("changeOrigin", "com/salesforce/api/rest/45.0"); |
| changeEventHeader.put("transactionKey", "000bc577-90c7-0d33-cebb-971bb50d75b8"); |
| changeEventHeader.put("sequenceNumber", 1L); |
| changeEventHeader.put("isTransactionEnd", Boolean.TRUE); |
| changeEventHeader.put("commitTimestamp", 1558949299000L); |
| changeEventHeader.put("commitUser", "0052p000009cl8uBBB"); |
| changeEventHeader.put("commitNumber", 10585193272713L); |
| changeEventHeader.put("entityName", "Account"); |
| changeEventHeader.put("recordIds", new Object[] {"0012p00002HHpNlAAL"}); |
| |
| when(mockChangeEventPayload.get("ChangeEventHeader")).thenReturn(changeEventHeader); |
| |
| when(mockChangeEventMap.get("replayId")).thenReturn(4L); |
| |
| when(mockChangeEventData.get("schema")).thenReturn("30H2pgzuWcF844p26Ityvg"); |
| when(mockChangeEventData.get("payload")).thenReturn(mockChangeEventPayload); |
| when(mockChangeEventData.get("event")).thenReturn(mockChangeEventMap); |
| |
| when(mockChangeEvent.getDataAsMap()).thenReturn(mockChangeEventData); |
| when(mockChangeEvent.getChannel()).thenReturn("/data/AccountChangeEvent"); |
| } |
| |
| @Test |
| public void shouldProcessMappedPayloadPushTopicMessages() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("AccountUpdates"); |
| configuration.setSObjectClass(AccountUpdates.class.getName()); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage); |
| |
| final AccountUpdates accountUpdates = new AccountUpdates(); |
| accountUpdates.phone = "(415) 555-1212"; |
| accountUpdates.id = "001D000000KneakIAB"; |
| accountUpdates.name = "Blackbeard"; |
| |
| verify(in).setBody(accountUpdates); |
| verify(in).setHeader("CamelSalesforceEventType", "created"); |
| verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z"); |
| verify(in).setHeader("CamelSalesforceReplayId", 1L); |
| verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates"); |
| verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates"); |
| verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k"); |
| |
| verify(processor).process(same(exchange), any()); |
| } |
| |
| @Test |
| public void shouldProcessPlatformEvents() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("/event/TestEvent__e"); |
| |
| final Message message = new HashMapMessage(); |
| final Map<String, Object> data = new HashMap<>(); |
| data.put("schema", "30H2pgzuWcF844p26Ityvg"); |
| |
| final Map<String, Object> payload = new HashMap<>(); |
| payload.put("Test_Field__c", "abc"); |
| payload.put("CreatedById", "00541000002WYFpAAO"); |
| payload.put("CreatedDate", "2018-07-06T12:41:04Z"); |
| data.put("payload", payload); |
| data.put("event", Collections.singletonMap("replayId", 4L)); |
| message.put("data", data); |
| message.put("channel", "/event/TestEvent__e"); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), message); |
| |
| final ZonedDateTime created = ZonedDateTime.parse("2018-07-06T12:41:04Z"); |
| final PlatformEvent event = new PlatformEvent(created, "00541000002WYFpAAO"); |
| event.set("Test_Field__c", "abc"); |
| verify(in).setBody(event); |
| verify(in).setHeader("CamelSalesforceCreatedDate", created); |
| verify(in).setHeader("CamelSalesforceReplayId", 4L); |
| verify(in).setHeader("CamelSalesforceChannel", "/event/TestEvent__e"); |
| verify(in).setHeader("CamelSalesforceEventType", "TestEvent__e"); |
| verify(in).setHeader("CamelSalesforcePlatformEventSchema", "30H2pgzuWcF844p26Ityvg"); |
| |
| verify(processor).process(same(exchange), any()); |
| |
| verifyNoMoreInteractions(in, processor); |
| } |
| |
| @Test |
| public void shouldProcessPushTopicMessages() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("AccountUpdates"); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage); |
| |
| @SuppressWarnings("unchecked") |
| final Object sobject = ((Map<String, Object>)pushTopicMessage.get("data")).get("sobject"); |
| verify(in).setBody(sobject); |
| verify(in).setHeader("CamelSalesforceEventType", "created"); |
| verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z"); |
| verify(in).setHeader("CamelSalesforceReplayId", 1L); |
| verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates"); |
| verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates"); |
| verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k"); |
| |
| verify(processor).process(same(exchange), any()); |
| } |
| |
| @Test |
| public void shouldProcessRawPayloadPushTopicMessages() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("AccountUpdates"); |
| configuration.setRawPayload(true); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage); |
| |
| verify(in).setBody("{\"Phone\":\"(415) 555-1212\",\"Id\":\"001D000000KneakIAB\",\"Name\":\"Blackbeard\"}"); |
| verify(in).setHeader("CamelSalesforceEventType", "created"); |
| verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z"); |
| verify(in).setHeader("CamelSalesforceReplayId", 1L); |
| verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates"); |
| verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates"); |
| verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k"); |
| |
| verify(processor).process(same(exchange), any()); |
| } |
| |
| @Test |
| public void shouldProcessRawPlatformEvents() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("/event/TestEvent__e"); |
| configuration.setRawPayload(true); |
| |
| final Message message = new HashMapMessage(); |
| final Map<String, Object> data = new HashMap<>(); |
| data.put("schema", "30H2pgzuWcF844p26Ityvg"); |
| |
| final Map<String, Object> payload = new HashMap<>(); |
| payload.put("Test_Field__c", "abc"); |
| payload.put("CreatedById", "00541000002WYFpAAO"); |
| payload.put("CreatedDate", "2018-07-06T12:41:04Z"); |
| data.put("payload", payload); |
| data.put("event", Collections.singletonMap("replayId", 4L)); |
| message.put("data", data); |
| message.put("channel", "/event/TestEvent__e"); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), message); |
| |
| verify(in).setBody(message); |
| verify(in).setHeader("CamelSalesforceCreatedDate", ZonedDateTime.parse("2018-07-06T12:41:04Z")); |
| verify(in).setHeader("CamelSalesforceReplayId", 4L); |
| verify(in).setHeader("CamelSalesforceChannel", "/event/TestEvent__e"); |
| verify(in).setHeader("CamelSalesforceEventType", "TestEvent__e"); |
| verify(in).setHeader("CamelSalesforcePlatformEventSchema", "30H2pgzuWcF844p26Ityvg"); |
| |
| verify(processor).process(same(exchange), any()); |
| |
| verifyNoMoreInteractions(in, processor); |
| } |
| |
| @Test |
| public void shouldProcessChangeEvents() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent"); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent); |
| |
| verify(in).setBody(mockChangeEventPayload); |
| verify(in).setHeader("CamelSalesforceChannel", "/data/AccountChangeEvent"); |
| verify(in).setHeader("CamelSalesforceReplayId", 4L); |
| verify(in).setHeader("CamelSalesforceChangeEventSchema", "30H2pgzuWcF844p26Ityvg"); |
| verify(in).setHeader("CamelSalesforceEventType", "AccountChangeEvent"); |
| verify(in).setHeader("CamelSalesforceChangeType", "CREATE"); |
| verify(in).setHeader("CamelSalesforceChangeOrigin", "com/salesforce/api/rest/45.0"); |
| verify(in).setHeader("CamelSalesforceTransactionKey", "000bc577-90c7-0d33-cebb-971bb50d75b8"); |
| verify(in).setHeader("CamelSalesforceSequenceNumber", 1L); |
| verify(in).setHeader("CamelSalesforceIsTransactionEnd", Boolean.TRUE); |
| verify(in).setHeader("CamelSalesforceCommitTimestamp", 1558949299000L); |
| verify(in).setHeader("CamelSalesforceCommitUser", "0052p000009cl8uBBB"); |
| verify(in).setHeader("CamelSalesforceCommitNumber", 10585193272713L); |
| verify(in).setHeader("CamelSalesforceEntityName", "Account"); |
| verify(in).setHeader("CamelSalesforceRecordIds", new Object[] {"0012p00002HHpNlAAL"}); |
| |
| verify(mockChangeEventPayload).remove("ChangeEventHeader"); |
| |
| verify(processor).process(same(exchange), any()); |
| |
| verifyNoMoreInteractions(in, processor); |
| } |
| |
| @Test |
| public void processNoReplayIdChangeEventsShouldNotSetReplayIdHeader() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent"); |
| when(mockChangeEventMap.get("replayId")).thenReturn(null); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent); |
| |
| verify(in, never()).setHeader(eq("CamelSalesforceReplayId"), any()); |
| } |
| |
| @Test |
| public void processRawPayloadChangeEventsShouldSetInputMessageAsBody() throws Exception { |
| when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent"); |
| configuration.setRawPayload(true); |
| |
| final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED); |
| |
| consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent); |
| |
| verify(in).setBody(mockChangeEvent); |
| } |
| |
| static Message createPushTopicMessage() { |
| final Message pushTopicMessage = new HashMapMessage(); |
| pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k"); |
| |
| final Map<String, Object> data = new HashMap<>(); |
| pushTopicMessage.put("data", data); |
| |
| final Map<String, Object> event = new HashMap<>(); |
| data.put("event", event); |
| |
| event.put("createdDate", "2016-09-16T19:45:27.454Z"); |
| event.put("replayId", 1L); |
| event.put("type", "created"); |
| |
| final Map<String, Object> sobject = new HashMap<>(); |
| data.put("sobject", sobject); |
| |
| sobject.put("Phone", "(415) 555-1212"); |
| sobject.put("Id", "001D000000KneakIAB"); |
| sobject.put("Name", "Blackbeard"); |
| |
| pushTopicMessage.put("channel", "/topic/AccountUpdates"); |
| return pushTopicMessage; |
| } |
| } |