You can develop various plugins for Pulsar, such as entry filters, protocol handlers, interceptors, and so on.
This chapter describes what the entry filter is and shows how to use the entry filter.
The entry filter is an extension point for implementing a custom message entry strategy. With an entry filter, you can decide whether to send messages to consumers (brokers can use the return values of entry filters to determine whether the messages need to be sent or discarded) or send messages to specific consumers.
To implement features such as tagged messages or custom delayed messages, use subscriptionProperties
, ​​properties
, and entry filters.
Follow the steps below:
Create a Maven project.
Implement the EntryFilter
interface.
Package the implementation class into a NAR file.
Configure the broker.conf
file (or the standalone.conf
file) and restart your broker.
For how to create a Maven project, see here.
EntryFilter
interfaceAdd a dependency for Pulsar broker in the pom.xml
file to display. Otherwise, you can not find the EntryFilter
interface.
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-broker</artifactId> <version>${pulsar.version}</version> <scope>provided</scope> </dependency>
Implement the FilterResult filterEntry(Entry entry, FilterContext context);
method.
If the method returns ACCEPT
or NULL, this message is sent to consumers.
If the method returns REJECT
, this message is filtered out and it does not consume message permits.
If there are multiple entry filters, this message passes through all filters in the pipeline in a round-robin manner. If any entry filter returns REJECT
, this message is discarded.
You can get entry metadata, subscriptions, and other information through FilterContext
.
Describe a NAR file.
Create an entry_filter.yaml
file in the resources/META-INF/services
directory to describe a NAR file.
# Entry filter name, which should be configured in the broker.conf file later name: entryFilter # Entry filter description description: entry filter # Implementation class name of entry filter entryFilterClass: com.xxxx.xxxx.xxxx.DefaultEntryFilterImpl
Add the compiled plugin of the NAR file to your pom.xml
file.
<build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.2.0</version> <extensions>true</extensions> <configuration> <finalName>${project.artifactId}-${project.version}</finalName> </configuration> <executions> <execution> <id>default-nar</id> <phase>package</phase> <goals> <goal>nar</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Generate a NAR file in the target
directory.
mvn clean install
Configure the following parameters in the broker.conf
file (or the standalone.conf
file).
# Class name of pluggable entry filters # Multiple classes need to be separated by commas. entryFilterNames=entryFilter1,entryFilter2,entryFilter3 # The directory for all entry filter implementations entryFiltersDirectory=tempDir
Restart your broker.
You can see the following broker log if the plug-in is successfully loaded.
Successfully loaded entry filter for name `{name of your entry filter}`