blob: 591a0cffe14eaadff0f5f7e92cda6b205dfd1972 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.journal.impl.publisher;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
@Component(immediate = true)
@ParametersAreNonnullByDefault
public class PackageDistributedNotifier implements TopologyChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
@Reference
private EventAdmin eventAdmin;
@Reference
private PubQueueCacheService pubQueueCacheService;
@Reference
private MessagingProvider messagingProvider;
@Reference
private Topics topics;
private JsonMessageSender<PackageDistributedMessage> sender;
private boolean sendMsg;
@Activate
public void activate() {
sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
if (sendMsg) {
sender = messagingProvider.createJsonSender();
}
LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
}
@Override
public void changed(TopologyView oldView, TopologyView newView) {
TopologyViewDiff diffView = new TopologyViewDiff(oldView, newView);
diffView.getProcessedOffsets().forEach(this::processOffsets);
}
/**
* @param pubAgentName the name of the publisher agent
* @param offsets range of offsets, from smallest offset to largest offset.
*/
private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
long minOffset = offsets.get().findFirst().getAsLong();
OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
offsets
.get()
.mapToObj(offsetQueue::getItem)
.filter(Objects::nonNull)
.forEach(msg -> processOffset(pubAgentName, msg));
}
protected void processOffset(String pubAgentName, DistributionQueueItem queueItem) {
sendEvt(pubAgentName, queueItem);
sendMsg(pubAgentName, queueItem);
}
private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) {
if (sendMsg) {
PackageDistributedMessage msg = new PackageDistributedMessage();
msg.pubAgentName = pubAgentName;
msg.packageId = queueItem.getPackageId();
msg.offset = (Long) queueItem.get(QueueItemFactory.RECORD_OFFSET);
msg.paths = (String[]) queueItem.get(PROPERTY_REQUEST_PATHS);
msg.deepPaths = (String[]) queueItem.get(PROPERTY_REQUEST_DEEP_PATHS);
sender.send(topics.getEventTopic(), msg);
}
}
private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
eventAdmin.sendEvent(distributed);
}
}