blob: 3a04589988b9448c310960bb2ee7a7fb5a6baaeb [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.pubsub;
import org.apache.edgent.connectors.pubsub.oplets.Publish;
import org.apache.edgent.connectors.pubsub.service.PublishSubscribeService;
import org.apache.edgent.execution.services.RuntimeServices;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.TopologyElement;
/**
* Publish subscribe model.
* <BR>
* A stream can be {@link #publish(TStream, String, Class) published } to a topic
* and then {@link #subscribe(TopologyElement, String, Class) subscribed} to by
* other topologies.
*
* <P>
* A published topic has a type and subscribers must subscribe using the same
* topic and type (inheritance matching is not supported).
* <BR>
* Multiple streams from different topologies can be published to
* same topic (and type) and there can be multiple subscribers
* from different topologies.
* <BR>
* A subscriber can exist before a publisher exists, they are connected
* automatically when the job starts.
* </P>
* <P>
* If no {@link PublishSubscribeService} is registered then published
* tuples are discarded and subscribers see no tuples.
* </P>
* <P>
* The recommended style for topics is MQTT topics, where {@code /}
* is used to provide a hierarchy into topics. For example {@code engine/sensors/temperature}
* might be a topic that represents temperature sensors in an engine.
* <BR>
* Topics that start with {@code edgent/} are reserved for use by Edgent.
* <BR>
* MQTT style wild-cards are not supported.
* </P>
*/
public class PublishSubscribe {
/**
* Topics that start with {@value} are reserved for use by Edgent.
*/
public static final String RESERVED_TOPIC_PREFIX= "edgent/";
/**
* Publish this stream to a topic.
* This is a model that allows jobs to subscribe to
* streams published by other jobs.
*
* @param <T> Tuple type
* @param stream stream to publish
* @param topic Topic to publish to.
* @param streamType Type of objects on the stream.
* @return sink element representing termination of this stream.
*
* @see #subscribe(TopologyElement, String, Class)
*/
public static <T> TSink<T> publish(TStream<T> stream, String topic, Class<? super T> streamType) {
return stream.sink(new Publish<>(topic, streamType));
}
/**
* Subscribe to a published topic.
* This is a model that allows jobs to subscribe to
* streams published by other jobs.
* @param <T> Tuple type
* @param te TopologyElement whose Topology to add to
* @param topic Topic to subscribe to.
* @param streamType Type of the stream.
* @return Stream containing published tuples.
*
* @see #publish(TStream, String, Class)
*/
public static <T> TStream<T> subscribe(TopologyElement te, String topic, Class<T> streamType) {
Topology topology = te.topology();
Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
return te.topology().events(new SubscriberSetup<T>(topic, streamType, rts));
}
/**
* Subscriber setup function that adds a subscriber on
* start up and removes it on close.
*
* @param <T> Type of the tuples.
*/
private static final class SubscriberSetup<T> implements Consumer<Consumer<T>>, AutoCloseable{
private static final long serialVersionUID = 1L;
private final Supplier<RuntimeServices> rts;
private final String topic;
private final Class<T> streamType;
private Consumer<T> submitter;
SubscriberSetup(String topic, Class<T> streamType, Supplier<RuntimeServices> rts) {
this.topic = topic;
this.streamType = streamType;
this.rts = rts;
}
@Override
public void accept(Consumer<T> submitter) {
PublishSubscribeService pubSub = rts.get().getService(PublishSubscribeService.class);
if (pubSub != null) {
this.submitter = submitter;
pubSub.addSubscriber(topic, streamType, submitter);
}
}
@Override
public void close() throws Exception {
PublishSubscribeService pubSub = rts.get().getService(PublishSubscribeService.class);
if (pubSub != null) {
pubSub.removeSubscriber(topic, submitter);
}
}
}
}