blob: 7ab2bdff007dd2f146708ad75f34a10b44f9111c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.journal.impl.publisher;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
public class DistributionPublisherTest {
private static final String SUBAGENT1 = "subscriber-agent1";
private static final String QUEUE_NAME = "5eb78e0f-e8a2-4589-97dd-21649b37a0da-" + SUBAGENT1;
private static final String PUB1AGENT1 = "pub1agent1";
@Mock
private EventAdmin eventAdmin;
@Mock
private DiscoveryService discoveryService;
@Mock
private PubQueueProvider pubQueueProvider;
@Mock
private SlingSettingsService slingSettings;
@Mock
private MessagingProvider messagingProvider;
@Mock
private PackageMessageFactory factory;
@Mock
private DistributionPackageBuilder packageBuilder;
@Mock
private DistributionMetricsService distributionMetricsService;
@Mock
private Histogram histogram;
@Mock
private Meter meter;
@Mock
private Timer timer;
@Mock
private Timer.Context timerContext;
@Mock
private BundleContext context;
@InjectMocks
private DistributionPublisher publisher;
@Mock
private ServiceRegistration<DistributionAgent> serviceReg;
@Mock
private ResourceResolver resourceResolver;
@Mock
private MessageSender<Messages.PackageMessage> sender;
@Mock
private PackageQueuedNotifier queuedNotifier;
@Mock
private TopologyView topology;
@Captor
private ArgumentCaptor<Messages.PackageMessage> pkgCaptor;
@Spy
private Topics topics = new Topics();
@SuppressWarnings("unchecked")
@Before
public void before() {
MockitoAnnotations.initMocks(this);
when(packageBuilder.getType()).thenReturn("journal");
Map<String, String> props = ImmutableMap.of("name", PUB1AGENT1);
PublisherConfiguration config = Converters.standardConverter().convert(props).to(PublisherConfiguration.class);
when(slingSettings.getSlingId()).thenReturn("pub1sling");
when(context.registerService(Mockito.eq(DistributionAgent.class), Mockito.eq(publisher),
Mockito.any(Dictionary.class))).thenReturn(serviceReg);
when(messagingProvider.<Messages.PackageMessage>createSender()).thenReturn(sender);
publisher.activate(config, context);
when(timer.time()).thenReturn(timerContext);
}
@After
public void after() {
publisher.deactivate();
verify(serviceReg).unregister();
}
@SuppressWarnings("unchecked")
@Test
public void testSend() throws DistributionException, IOException {
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/test");
Messages.PackageMessage pkg = mockPackage(request);
when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver), anyString(), Mockito.eq(request))).thenReturn(pkg);
CompletableFuture<Void> callback = CompletableFuture.completedFuture(null);
when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback);
when(distributionMetricsService.getExportedPackageSize()).thenReturn(histogram);
when(distributionMetricsService.getAcceptedRequests()).thenReturn(meter);
when(distributionMetricsService.getDroppedRequests()).thenReturn(meter);
when(distributionMetricsService.getBuildPackageDuration()).thenReturn(timer);
when(distributionMetricsService.getEnqueuePackageDuration()).thenReturn(timer);
DistributionResponse response = publisher.execute(resourceResolver, request);
assertThat(response.getState(), equalTo(DistributionRequestState.ACCEPTED));
verify(sender).send(Mockito.eq(topics.getPackageTopic()), pkgCaptor.capture());
Messages.PackageMessage sent = pkgCaptor.getValue();
// Individual fields are checks in factory
assertThat(sent, notNullValue());
List<String> log = publisher.getLog().getLines();
assertThat(log, contains(
containsString("Started Publisher agent pub1agent1"),
containsString("Distribution request accepted")));
}
@SuppressWarnings("unchecked")
@Test
public void testSendUnsupported() throws DistributionException, IOException {
DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.PULL, "/test");
DistributionResponse response = publisher.execute(resourceResolver, request);
assertThat(response.getState(), equalTo(DistributionRequestState.DROPPED));
List<String> log = publisher.getLog().getLines();
assertThat(log, contains(
containsString("Started Publisher agent pub1agent1"),
containsString("Request type PULL is not supported by this agent, expected one of")));
}
@Test
public void testQueueNames() throws DistributionException, IOException {
when(discoveryService.getTopologyView()).thenReturn(topology);
when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
State state = stateWithMaxRetries(-1);
when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
Iterable<String> names = publisher.getQueueNames();
assertThat(names, contains(QUEUE_NAME));
}
@Test
public void testQueueNamesWithErrorQueue() throws DistributionException, IOException {
when(discoveryService.getTopologyView()).thenReturn(topology);
when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
State state = new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, 1, false);
when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
Iterable<String> names = publisher.getQueueNames();
assertThat(names, containsInAnyOrder(QUEUE_NAME + "-error", QUEUE_NAME));
}
@Test
public void testGetQueue() throws DistributionException, IOException {
when(discoveryService.getTopologyView()).thenReturn(topology);
when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
State state = stateWithMaxRetries(1);
when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
publisher.getQueue(QUEUE_NAME);
publisher.getQueue(QUEUE_NAME + "-error");
// TODO Add assertions
}
@Test
public void testGetWrongQueue() throws DistributionException, IOException {
when(discoveryService.getTopologyView()).thenReturn(topology);
when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
DistributionQueue queue = publisher.getQueue("i_am_not_a_queue");
assertNull(queue);
}
private State stateWithMaxRetries(int maxRetries) {
return new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, maxRetries, false);
}
private Messages.PackageMessage mockPackage(DistributionRequest request) throws IOException {
return Messages.PackageMessage.newBuilder()
.setPkgId("myid")
.setPubSlingId("pub1sling")
.setPkgType("journal")
.setReqType(Messages.PackageMessage.ReqType.ADD)
.addAllPaths(Arrays.asList(request.getPaths()))
.addDeepPaths("/test2")
.setPkgBinary(ByteString.copyFrom(new byte[100]))
.build();
}
}