[AMQ-9455] DestinationPolicy support for MessageInterceptorStrategy
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 1426f33..4ca3913 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -28,6 +28,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -35,7 +36,6 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
@@ -99,6 +99,7 @@
     private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
     protected int cursorMemoryHighWaterMark = 70;
     protected int storeUsageHighWaterMark = 100;
+    private MessageInterceptorStrategy messageInterceptorStrategy;
     private SlowConsumerStrategy slowConsumerStrategy;
     private boolean prioritizedMessages;
     private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
@@ -942,4 +943,12 @@
     public SystemUsage getSystemUsage() {
         return systemUsage;
     }
+
+    public MessageInterceptorStrategy getMessageInterceptorStrategy() {
+        return this.messageInterceptorStrategy;
+    }
+
+    public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
+        this.messageInterceptorStrategy = messageInterceptorStrategy;
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f6cfd04..6502a20 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -48,6 +48,8 @@
 
 import jakarta.jms.InvalidSelectorException;
 import jakarta.jms.JMSException;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageFormatRuntimeException;
 import jakarta.jms.ResourceAllocationException;
 
 import org.apache.activemq.broker.BrokerService;
@@ -625,6 +627,15 @@
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
                 && !context.isInRecoveryMode();
+
+        if(getMessageInterceptorStrategy() != null) {
+            try {
+                getMessageInterceptorStrategy().process(producerExchange, message);
+            } catch (MessageFormatRuntimeException e) {
+                throw new MessageFormatException(e.getMessage(), e.getErrorCode());
+            }
+        }
+
         if (message.isExpired()) {
             // message not stored - or added to stats yet - so chuck here
             broker.getRoot().messageExpired(context, message, null);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 61ecb7a..cad0d3b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -64,6 +64,8 @@
 import org.slf4j.LoggerFactory;
 
 import jakarta.jms.JMSException;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageFormatRuntimeException;
 
 import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
 
@@ -371,6 +373,14 @@
 
         message.setRegionDestination(this);
 
+        if(getMessageInterceptorStrategy() != null) {
+            try {
+                getMessageInterceptorStrategy().process(producerExchange, message);
+            } catch (MessageFormatRuntimeException e) {
+                throw new MessageFormatException(e.getMessage(), e.getErrorCode());
+            }
+        }
+
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if (message.isExpired()) {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageInterceptorStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageInterceptorStrategy.java
new file mode 100644
index 0000000..729544d
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageInterceptorStrategy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.Arrays;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+
+import jakarta.jms.MessageFormatRuntimeException;
+
+/**
+ * Configurable chain of MessageInterceptorStrategies 
+ *
+ * @org.apache.xbean.XBean
+ */
+public class ChainMessageInterceptorStrategy implements MessageInterceptorStrategy {
+
+    private MessageInterceptorStrategy[] messageInterceptorStrategies;
+
+    @Override
+    public void process(ProducerBrokerExchange producerBrokerExchange, Message message) throws MessageFormatRuntimeException {
+        if(messageInterceptorStrategies == null || messageInterceptorStrategies.length == 0) {
+            return;
+        }
+
+        Arrays.stream(messageInterceptorStrategies).forEach(m -> m.process(producerBrokerExchange, message));
+    }
+
+    public void setMessageStrategies(MessageInterceptorStrategy[] messageInterceptorStrategies) {
+        this.messageInterceptorStrategies = messageInterceptorStrategies;
+    }
+
+    public MessageInterceptorStrategy[] getMessageInterceptorStrategies() {
+        return this.messageInterceptorStrategies;
+    }
+}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageInterceptorStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageInterceptorStrategy.java
new file mode 100644
index 0000000..2a651d8
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageInterceptorStrategy.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import jakarta.jms.MessageFormatRuntimeException;
+
+/**
+ * Enforce message policies for JMS Header values
+ *
+ * @org.apache.xbean.XBean
+ */
+public class HeaderMessageInterceptorStrategy implements MessageInterceptorStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HeaderMessageInterceptorStrategy.class);
+
+    boolean forceDeliveryMode = false;
+
+    boolean persistent = true;
+
+    boolean forceExpiration = false;
+
+    /**
+    * variable which (when non-zero) is used to override
+    * the expiration date for messages that arrive with
+    * no expiration date set (in Milliseconds).
+    */
+    long zeroExpirationOverride = 0;
+
+    /**
+    * variable which (when non-zero) is used to limit
+    * the expiration date (in Milliseconds).
+    */
+    long expirationCeiling = 0;
+
+    /**
+     * If true, the plugin will not update timestamp to past values
+     * False by default
+     */
+    boolean futureOnly = false;
+
+    /**
+     * if true, update timestamp even if message has passed through a network
+     * default false
+     */
+    boolean processNetworkMessages = false;
+
+    /**
+    * setter method for zeroExpirationOverride
+    */
+    public void setZeroExpirationOverride(long ttl)
+    {
+        this.zeroExpirationOverride = ttl;
+    }
+
+    /**
+    * setter method for expirationCeiling
+    */
+    public void setExpirationCeiling(long expirationCeiling)
+    {
+        this.expirationCeiling = expirationCeiling;
+    }
+
+    public void setFutureOnly(boolean futureOnly) {
+        this.futureOnly = futureOnly;
+    }
+
+    public void setProcessNetworkMessages(Boolean processNetworkMessages) {
+        this.processNetworkMessages = processNetworkMessages;
+    }
+
+    @Override
+    public void process(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws MessageFormatRuntimeException {
+        if(!isProcessNetworkMessages() && producerBrokerExchange.getConnectionContext().isNetworkConnection()) {
+            // Message passed through a network and processNetworkMessages=true is not set
+            return;
+        }
+
+        if(isForceExpiration()) {
+            if (message.getTimestamp() > 0 && !message.getDestination().isDLQ()) {
+                 long oldExpiration = message.getExpiration();
+                 long newTimeStamp = System.currentTimeMillis();
+                 long timeToLive = zeroExpirationOverride;
+                 long oldTimestamp = message.getTimestamp();
+                 if (oldExpiration > 0) {
+                     timeToLive = oldExpiration - oldTimestamp;
+                 }
+                 if (timeToLive > 0 && expirationCeiling > 0 && timeToLive > expirationCeiling) {
+                     timeToLive = expirationCeiling;
+                 }
+                 long expiration = timeToLive + newTimeStamp;
+                 // In the scenario that the Broker is behind the clients we never want to set the
+                 // Timestamp and Expiration in the past
+                 if(!futureOnly || (expiration > oldExpiration)) {
+                     if (timeToLive > 0 && expiration > 0) {
+                         message.setExpiration(expiration);
+                     }
+                     message.setTimestamp(newTimeStamp);
+                     LOG.debug("Set message {} timestamp from {} to {}", message.getMessageId(), oldTimestamp, newTimeStamp);
+                 }
+            }
+        }
+
+        if(forceDeliveryMode) {
+            message.setPersistent(isPersistent());
+        }
+    }
+
+    public void setForceDeliveryMode(boolean forceDeliveryMode) {
+        this.forceDeliveryMode = forceDeliveryMode;
+    }
+
+    public boolean isForceDeliveryMode() {
+        return this.forceDeliveryMode;
+    }
+
+    public void setForceExpiration(boolean forceExpiration) {
+        this.forceExpiration = forceExpiration;
+    }
+
+    public boolean isForceExpiration() {
+        return this.forceExpiration;
+    }
+
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public boolean isPersistent() {
+        return this.persistent;
+    }
+
+    public void setProcessNetworkMessages(boolean processNetworkMessages) {
+        this.processNetworkMessages = processNetworkMessages;
+    }
+
+    public boolean isProcessNetworkMessages() {
+        return this.processNetworkMessages;
+    }
+}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageInterceptorStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageInterceptorStrategy.java
new file mode 100644
index 0000000..8969328
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageInterceptorStrategy.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import jakarta.jms.MessageFormatRuntimeException;
+
+public interface MessageInterceptorStrategy {
+ 
+    /**
+     * When a PolicyEntry is configured with a MessageInterceptorStrategy, the 
+     * process method is invoked with the current ProducerBrokerExchange and Message before
+     * the message is stored in any destination cache or persistence store.
+     * 
+     * Implementations may reference data from the ProducerBrokerExchange and may check or
+     * modify headers, properties, body or other metadata on the Message.
+     * 
+     * Any change to the message must adhere to OpenWire and ActiveMQ requirements or risk
+     * issues with memory usage, compatibility, and general correct functioning.
+     * 
+     * Implementations shall not copy, or clone the message.
+     * 
+     * Implementations may throw a <tt>MessageFormatRuntimeException</tt>
+     * that is returned to the client to indicate a message should not be added to the queue.
+     * 
+     * @param producerBrokerExchange
+     * @param message
+     * @return
+     * @throws MessageFormatRuntimeException
+     */
+    void process(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws MessageFormatRuntimeException;
+
+}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 51b20e3..7230957 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -115,6 +115,7 @@
     private int sendFailIfNoSpace = -1;
     private long sendFailIfNoSpaceAfterTimeout = -1;
 
+    private MessageInterceptorStrategy messageInterceptorStrategy = null;
 
     public void configure(Broker broker,Queue queue) {
         baseConfiguration(broker,queue);
@@ -139,6 +140,7 @@
         queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
         queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
         queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
+        queue.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
     }
 
     public void update(Queue queue) {
@@ -201,6 +203,7 @@
             topic.getMemoryUsage().setLimit(memoryLimit);
         }
         topic.setLazyDispatch(isLazyDispatch());
+        topic.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
     }
 
     public void update(Topic topic) {
@@ -1165,4 +1168,12 @@
     public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
         this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
     }
+
+    public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
+        this.messageInterceptorStrategy = messageInterceptorStrategy;
+    }
+
+    public MessageInterceptorStrategy getMessageInterceptorStrategy() {
+        return this.messageInterceptorStrategy;
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
new file mode 100644
index 0000000..34ccf3d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.io.File;
+import java.util.Random;
+
+import jakarta.jms.BytesMessage;
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageFormatRuntimeException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * This unit test is to test that MessageInterceptorStrategy features
+ *
+ */
+public class MessageInterceptorStrategyMemoryUsageTest extends TestSupport {
+
+    BrokerService broker;
+    ConnectionFactory factory;
+    Connection connection;
+    Session session;
+    MessageProducer producer;
+    QueueBrowser queueBrowser;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+
+        File testDataDir = new File("target/activemq-data/message-interceptor-strategy");
+        broker.setDataDirectoryFile(testDataDir);
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
+                .get(0).getConnectUri().toString());
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if(producer != null) {
+            producer.close();
+        }
+        session.close();
+        connection.stop();
+        connection.close();
+        broker.stop();
+    }
+
+    /**
+     * Test sending messages that have body modified have correct usage
+     * 
+     * Start with 10x 1k message bodies that get increased to 1mb
+     */
+    @Test
+    public void testMemoryUsageBodyIncrease() throws Exception {
+        applyHeaderMessageInterceptor(1*1024*1024);
+        String queueName = "mis.bodySize.increase";
+        Queue queue = createQueue(queueName);
+
+        for (int i=0; i<10; i++) {
+            BytesMessage sendMessageP = session.createBytesMessage();
+            byte[] origBody = new byte[1*1024];
+            sendMessageP.writeBytes(origBody);
+            producer.send(queue, sendMessageP);
+        }
+
+        QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
+        assertEquals(Long.valueOf(10_496_000l), Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
+    }
+
+    /**
+     * Test sending messages that have body modified have correct usage
+     * 
+     * Start with 10x 1mb message bodies that get decreased to 1kb
+     */
+    @Test
+    public void testMemoryUsageBodyDecrease() throws Exception {
+        applyHeaderMessageInterceptor(1*1024);
+        String queueName = "mis.bodySize.decrease";
+        Queue queue = createQueue(queueName);
+
+        for (int i=0; i<10; i++) {
+            BytesMessage sendMessageP = session.createBytesMessage();
+            byte[] origBody = new byte[1*1024*1024];
+            sendMessageP.writeBytes(origBody);
+            producer.send(queue, sendMessageP);
+        }
+
+        QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
+        assertEquals(Long.valueOf(20_480), Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
+    }
+
+    private PolicyMap applyHeaderMessageInterceptor(final int bodySize) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+
+        MessageInterceptorStrategy bodySizeMessageInterceptorStrategy= new MessageInterceptorStrategy() {
+
+            @Override
+            public void process(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws MessageFormatRuntimeException {
+                if(bodySize > 0) {
+                    try {
+                        message.clearBody();
+                    } catch (JMSException e) {
+                        fail(e.getMessage());
+                    }
+                    byte[] newBody = new byte[bodySize];
+                    new Random().nextBytes(newBody);
+                    message.setContent(new ByteSequence(newBody));
+                    message.storeContent();
+                }
+            }
+        };
+        defaultEntry.setMessageInterceptorStrategy(bodySizeMessageInterceptorStrategy);
+
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        return policyMap;
+    }
+
+    private Queue createQueue(String queueName) throws Exception {
+        Queue queue = session.createQueue(queueName);
+        producer = session.createProducer(queue);
+        return queue;
+    }
+
+}
\ No newline at end of file
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
new file mode 100644
index 0000000..ae50d26
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Enumeration;
+
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.HeaderMessageInterceptorStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.test.TestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * This unit test is to test that MessageInterceptorStrategy features
+ *
+ */
+public class MessageInterceptorStrategyTest extends TestSupport {
+
+    BrokerService broker;
+    ConnectionFactory factory;
+    Connection connection;
+    Session session;
+    MessageProducer producer;
+    QueueBrowser queueBrowser;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+
+        File testDataDir = new File("target/activemq-data/message-interceptor-strategy");
+        broker.setDataDirectoryFile(testDataDir);
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
+                .get(0).getConnectUri().toString());
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if(producer != null) {
+            producer.close();
+        }
+        session.close();
+        connection.stop();
+        connection.close();
+        broker.stop();
+    }
+
+    /**
+     * Test sending messages can be forced to Persistent
+     */
+    @Test
+    public void testForceDeliveryModePersistent() throws Exception {
+        applyHeaderMessageInterceptor(true, true, false, 0l, Long.MAX_VALUE);
+
+        Queue queue = createQueue("mis.forceDeliveryMode.true");
+        Message sendMessageP = session.createTextMessage("forceDeliveryMode=true");
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(queue, sendMessageP);
+
+        Message sendMessageNP = session.createTextMessage("forceDeliveryMode=true");
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(queue, sendMessageNP);
+
+        queueBrowser = session.createBrowser(queue);
+        Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+        int count = 0;
+        while(browseEnumeration.hasMoreElements()) {
+            Message message = (Message)browseEnumeration.nextElement();
+            assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+            count++;
+        }
+        assertEquals(Integer.valueOf(2), Integer.valueOf(count));
+    }
+
+    /**
+     * Test sending messages can be forced to NonPersistent
+     */
+    @Test
+    public void testForceDeliveryModeNonPersistent() throws Exception {
+        applyHeaderMessageInterceptor(true, false, false, 0l, Long.MAX_VALUE);
+
+        Queue queue = createQueue("mis.forceDeliveryMode.false");
+        Message sendMessageP = session.createTextMessage("forceDeliveryMode=false");
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(queue, sendMessageP);
+
+        Message sendMessageNP = session.createTextMessage("forceDeliveryMode=false");
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.send(queue, sendMessageNP);
+
+        queueBrowser = session.createBrowser(queue);
+        Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+        int count = 0;
+        while(browseEnumeration.hasMoreElements()) {
+            Message message = (Message)browseEnumeration.nextElement();
+            assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
+            count++;
+        }
+        assertEquals(Integer.valueOf(2), Integer.valueOf(count));
+    }
+
+    /**
+     * Test not overriding expiration
+     */
+    @Test
+    public void testForceExpirationDisabled() throws Exception {
+        applyHeaderMessageInterceptor(false, false, false, 100_000l, Long.MAX_VALUE);
+
+        Queue queue = createQueue("mis.forceExpiration.zero");
+        Message sendMessageP = session.createTextMessage("expiration=zero");
+        producer.setTimeToLive(0l);
+        producer.send(queue, sendMessageP);
+
+        queueBrowser = session.createBrowser(queue);
+        Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+        int count = 0;
+        while(browseEnumeration.hasMoreElements()) {
+            Message message = (Message)browseEnumeration.nextElement();
+            assertEquals(Long.valueOf(0l), Long.valueOf(message.getJMSExpiration()));
+            count++;
+        }
+        assertEquals(Integer.valueOf(1), Integer.valueOf(count));
+    }
+
+    /**
+     * Test overriding zero (0) expiration
+     */
+    @Test
+    public void testForceExpirationZeroOverride() throws Exception {
+        long expiryTime = 100_000l;
+        applyHeaderMessageInterceptor(false, false, true, expiryTime, Long.MAX_VALUE);
+
+        long currentTime = System.currentTimeMillis();
+        Queue queue = createQueue("mis.forceExpiration.100k");
+        Message sendMessageP = session.createTextMessage("expiration=zero");
+        producer.setTimeToLive(100_000l);
+        producer.send(queue, sendMessageP);
+
+        queueBrowser = session.createBrowser(queue);
+        Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+        int count = 0;
+        while(browseEnumeration.hasMoreElements()) {
+            Message message = (Message)browseEnumeration.nextElement();
+            assertTrue(Long.valueOf(message.getJMSExpiration()) > currentTime +  (expiryTime / 2));
+            count++;
+        }
+        assertEquals(Integer.valueOf(1), Integer.valueOf(count));
+    }
+
+    /**
+     * Test overriding zero (0) expiration
+     */
+    @Test
+    public void testForceExpirationZeroOverrideDLQ() throws Exception {
+        long expiryTime = 1l;
+        applyHeaderMessageInterceptor(false, false, true, expiryTime, Long.MAX_VALUE);
+
+        Queue queue = createQueue("mis.forceExpiration.zero-no-dlq-expiry");
+        Message sendMessageP = session.createTextMessage("expiration=zero-no-dlq-expiry");
+        producer.send(queue, sendMessageP);
+
+        Thread.sleep(250l);
+
+        queueBrowser = session.createBrowser(queue);
+        Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+        int count = 0;
+        while(browseEnumeration.hasMoreElements()) {
+            count++;
+        }
+        assertEquals(Integer.valueOf(0), Integer.valueOf(count));
+
+        QueueBrowser dlqQueueBrowser = session.createBrowser(createQueue("mis.forceExpiration.zero-no-dlq-expiry.dlq"));
+        Enumeration<?> dlqBrowseEnumeration = dlqQueueBrowser.getEnumeration();
+
+        int dlqCount = 0;
+        while(dlqBrowseEnumeration.hasMoreElements()) {
+            Message dlqMessage = (Message)dlqBrowseEnumeration.nextElement();
+            assertEquals(sendMessageP.getJMSMessageID(), dlqMessage.getJMSMessageID());
+            assertEquals("Expiration should be zero" + dlqMessage.getJMSExpiration() + "\n", dlqMessage.getJMSExpiration(), 0);
+            dlqCount++;
+        }
+        assertEquals(Integer.valueOf(1), Integer.valueOf(dlqCount));
+    }
+
+    /**
+     * Test overriding expiration ceiling
+     */
+    @Test
+    public void testForceExpirationCeilingOverride() throws Exception {
+        long zeroOverrideExpiryTime = 100_000l;
+        long expirationCeiling = Duration.ofDays(1).toMillis();
+        applyHeaderMessageInterceptor(false, false, true, zeroOverrideExpiryTime, expirationCeiling);
+
+        long currentTime = System.currentTimeMillis();
+        long expiryTime = Duration.ofDays(10).toMillis();
+        Queue queue = createQueue("mis.forceExpiration.maxValue");
+        Message sendMessageP = session.createTextMessage("expiration=ceiling");
+        producer.setTimeToLive(expiryTime);
+        producer.send(queue, sendMessageP);
+
+        queueBrowser = session.createBrowser(queue);
+        Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
+
+        int count = 0;
+        while(browseEnumeration.hasMoreElements()) {
+            Message message = (Message)browseEnumeration.nextElement();
+            assertTrue(Long.valueOf(message.getJMSExpiration()) <  (currentTime + Duration.ofDays(9).toMillis()));
+            count++;
+        }
+        assertEquals(Integer.valueOf(1), Integer.valueOf(count));
+    }
+
+    private PolicyMap applyHeaderMessageInterceptor(boolean forceDeliveryMode, boolean persistent, boolean forceExpiration, long zeroExpirationOverride, long expirationCeiling) {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+
+        HeaderMessageInterceptorStrategy headerMessageInterceptorStrategy = new HeaderMessageInterceptorStrategy();
+
+        // Persistence related fields
+        headerMessageInterceptorStrategy.setForceDeliveryMode(forceDeliveryMode);
+        headerMessageInterceptorStrategy.setPersistent(persistent);
+
+        // Expiration related fields
+        headerMessageInterceptorStrategy.setForceExpiration(forceExpiration);
+        headerMessageInterceptorStrategy.setZeroExpirationOverride(zeroExpirationOverride);
+        headerMessageInterceptorStrategy.setExpirationCeiling(expirationCeiling);
+        defaultEntry.setMessageInterceptorStrategy(headerMessageInterceptorStrategy);
+
+        IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
+        individualDeadLetterStrategy.setQueuePrefix("");
+        individualDeadLetterStrategy.setQueueSuffix(".dlq");
+        defaultEntry.setDeadLetterStrategy(individualDeadLetterStrategy);
+
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        return policyMap;
+    }
+
+    private Queue createQueue(String queueName) throws Exception {
+        Queue queue = session.createQueue(queueName);
+        producer = session.createProducer(queue);
+        return queue;
+    }
+
+}
\ No newline at end of file