[PIP-82] [pulsar-broker] Add a task to publish resource usage to a topic (#10008)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bbb6896..856f826 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -680,6 +680,26 @@
             + " non-backlog consumers as well.")
     private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "Default policy for publishing usage reports to system topic is disabled."
+            + "This enables publishing of usage reports"
+    )
+    private String resourceUsageTransportClassName = "";
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "Topic to publish usage reports to if resourceUsagePublishToTopic is enabled."
+    )
+    private String resourceUsageTransportPublishTopicName = "non-persistent://pulsar/system/resource-usage";
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled."
+    )
+    private int resourceUsageTransportPublishIntervalInSecs = 60;
+
     // <-- dispatcher read settings -->
     @FieldContext(
         dynamic = true,
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 3bcdd82..49609c2 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -472,6 +472,7 @@
         <configuration>
           <protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
           <checkStaleness>true</checkStaleness>
+          <excludes>**/ResourceUsage.proto</excludes>
         </configuration>
         <executions>
           <execution>
@@ -483,6 +484,22 @@
           </execution>
         </executions>
       </plugin>
+
+      <plugin>
+        <groupId>com.github.splunk.lightproto</groupId>
+        <artifactId>lightproto-maven-plugin</artifactId>
+        <version>${lightproto-maven-plugin.version}</version>
+        <configuration>
+          <sources>${project.basedir}/src/main/proto/ResourceUsage.proto</sources>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
     <resources>
       <resource>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aac8755..7235aef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -30,6 +30,7 @@
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -85,6 +86,7 @@
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.protocol.ProtocolHandlers;
+import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
@@ -177,6 +179,7 @@
     private GlobalZooKeeperCache globalZkCache;
     private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
     private Compactor compactor;
+    private ResourceUsageTransportManager resourceUsageTransportManager;
 
     private final ScheduledExecutorService executor;
     private final ScheduledExecutorService cacheExecutor;
@@ -732,6 +735,15 @@
                 this.startPackagesManagementService();
             }
 
+            // Start the task to publish resource usage, if necessary
+            if (config.getResourceUsageTransportClassName() != null
+              && config.getResourceUsageTransportClassName() != "") {
+                Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
+                Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
+                Object object = ctor.newInstance(new Object[] { this });
+                this.resourceUsageTransportManager = (ResourceUsageTransportManager) object;
+            }
+
             final String bootstrapMessage = "bootstrap service "
                     + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
                     + (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
@@ -1368,6 +1380,10 @@
         return topicPoliciesService;
     }
 
+    public ResourceUsageTransportManager getResourceUsageTransportManager() {
+        return resourceUsageTransportManager;
+    }
+
     public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
         if (metricsServlet == null) {
             if (pendingMetricsProviders == null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageConsumer.java
new file mode 100644
index 0000000..a8be076
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageConsumer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
+
+/*
+ * Interface that a resource owner (tenant, namespace or topic) needs to implement.
+ */
+public interface ResourceUsageConsumer {
+    /*
+     * Get the unique identifier for the owner
+     * @return return the owner ID
+     */
+    String getID();
+
+    /*
+     * Listener for resource usage published by other brokers
+     * @param broker name
+     * @param Resource usage object
+     */
+    void acceptResourceUsage(String broker, ResourceUsage resourceUsage);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsagePublisher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsagePublisher.java
new file mode 100644
index 0000000..7ae0bab
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsagePublisher.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
+
+/*
+ * Interface that a resource owner (tenant, namespace or topic) needs to implement.
+ */
+public interface ResourceUsagePublisher {
+    /*
+     * Get the unique identifier for the owner
+     * @return return the owner ID
+     */
+    String getID();
+
+    /*
+     * Fill the resource usage object that is passed in
+     * @param Resource Usage object
+     */
+    void fillResourceUsage(ResourceUsage resourceUsage);
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
new file mode 100644
index 0000000..f7064e4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.SNAPPY;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTransportManager implements AutoCloseable {
+
+    private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+        private final Producer<byte[]> producer;
+        private final ScheduledFuture<?> resourceUsagePublishTask;
+
+        private Producer<byte[]> createProducer() throws PulsarClientException {
+            final int publishDelayMilliSecs = 10;
+            final int sendTimeoutSecs = 10;
+
+            return pulsarClient.newProducer()
+                    .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+                    .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+                    .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+                    .blockIfQueueFull(false)
+                    .compressionType(SNAPPY)
+                    .create();
+        }
+
+        public ResourceUsageWriterTask() throws PulsarClientException {
+            producer = createProducer();
+            resourceUsagePublishTask = pulsarService.getExecutor().scheduleAtFixedRate(
+                    this,
+                    pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
+                    pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
+                    TimeUnit.SECONDS);
+        }
+
+        @Override
+        public void run() {
+            if (!publisherMap.isEmpty()) {
+                ResourceUsageInfo rUsageInfo = new ResourceUsageInfo();
+                rUsageInfo.setBroker(pulsarService.getBrokerServiceUrl());
+
+                publisherMap.forEach((key, item) -> item.fillResourceUsage(rUsageInfo.addUsageMap()));
+
+                ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer(rUsageInfo.getSerializedSize());
+                rUsageInfo.writeTo(buf);
+
+                byte[] bytes = buf.array();
+                producer.sendAsync(bytes).whenComplete((id, ex) -> {
+                    if (null != ex) {
+                        LOG.error("Resource usage publisher: sending message ID {} error {}", id, ex);
+                    }
+                    buf.release();
+                });
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            resourceUsagePublishTask.cancel(true);
+            producer.close();
+        }
+    }
+
+    private class ResourceUsageReader implements ReaderListener<byte[]>, AutoCloseable {
+        private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
+        private final Reader<byte[]> consumer;
+
+        public ResourceUsageReader() throws PulsarClientException {
+            consumer =  pulsarClient.newReader()
+                    .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+                    .startMessageId(MessageId.latest)
+                    .readerListener(this)
+                    .create();
+            }
+
+        @Override
+        public void close() throws Exception {
+            consumer.close();
+        }
+
+        @Override
+        public void received(Reader<byte[]> reader, Message<byte[]> msg) {
+            try {
+                recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
+
+                recdUsageInfo.getUsageMapsList().forEach(ru -> {
+                    ResourceUsageConsumer owner = consumerMap.get(ru.getOwner());
+                    if (owner != null) {
+                        owner.acceptResourceUsage(recdUsageInfo.getBroker(), ru);
+                    }
+                });
+
+            } catch (IllegalStateException exception) {
+                LOG.error("Resource usage reader: Error parsing incoming message {}", exception);
+            } catch (Exception exception) {
+                LOG.error("Resource usage reader: Unknown exception while parsing message {}", exception);
+            }
+        }
+
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTransportManager.class);
+    private final PulsarService pulsarService;
+    private final PulsarClient pulsarClient;
+    private final ResourceUsageWriterTask pTask;
+    private final ResourceUsageReader consumer;
+    private final Map<String, ResourceUsagePublisher>
+            publisherMap = new ConcurrentHashMap<String, ResourceUsagePublisher>();
+    private final Map<String, ResourceUsageConsumer>
+            consumerMap = new ConcurrentHashMap<String, ResourceUsageConsumer>();
+
+    private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
+        // Create a public tenant and default namespace
+        TopicName topicName = TopicName.get(pulsarService.getConfig().getResourceUsageTransportPublishTopicName());
+
+        PulsarAdmin admin = pulsarService.getAdminClient();
+        ServiceConfiguration config = pulsarService.getConfig();
+        String cluster = config.getClusterName();
+
+        final String tenant = topicName.getTenant();
+        final String namespace = topicName.getNamespace();
+
+        List<String> tenantList =  admin.tenants().getTenants();
+        if (!tenantList.contains(tenant)) {
+            admin.tenants().createTenant(tenant,
+                    new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
+        }
+        List<String> nsList = admin.namespaces().getNamespaces(tenant);
+        if (!nsList.contains(namespace)) {
+            admin.namespaces().createNamespace(namespace);
+        }
+    }
+
+    public ResourceUsageTransportManager(PulsarService pulsarService) throws Exception {
+        this.pulsarService = pulsarService;
+        this.pulsarClient = pulsarService.getClient();
+
+        try {
+            createTenantAndNamespace();
+            consumer = new ResourceUsageReader();
+            pTask = new ResourceUsageWriterTask();
+        } catch (Exception ex) {
+            LOG.error("Error initializing resource usage transport manager: {}", ex);
+            throw ex;
+        }
+    }
+
+    /*
+     * Register a resource owner (resource-group, tenant, namespace, topic etc).
+     *
+     * @param resource usage publisher
+     */
+    public void registerResourceUsagePublisher(ResourceUsagePublisher r) {
+        publisherMap.put(r.getID(), r);
+    }
+
+    /*
+     * Unregister a resource owner (resource-group, tenant, namespace, topic etc).
+     *
+     * @param resource usage publisher
+     */
+    public void unregisterResourceUsageProducer(ResourceUsagePublisher r) {
+        publisherMap.remove(r.getID());
+    }
+
+    /*
+     * Register a resource owner (resource-group, tenant, namespace, topic etc).
+     *
+     * @param resource usage consumer
+     */
+    public void registerResourceUsageConsumer(ResourceUsageConsumer r) {
+        consumerMap.put(r.getID(), r);
+    }
+
+    /*
+     * Unregister a resource owner (resource-group, tenant, namespace, topic etc).
+     *
+     * @param resource usage consumer
+     */
+    public void unregisterResourceUsageConsumer(ResourceUsageConsumer r) {
+        consumerMap.remove(r.getID());
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            pTask.close();
+            consumer.close();
+        } catch (Exception ex1) {
+            LOG.error("Error closing producer/consumer for resource-usage topic {}", ex1);
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/package-info.java
new file mode 100644
index 0000000..ba7ee9e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
\ No newline at end of file
diff --git a/pulsar-broker/src/main/proto/ResourceUsage.proto b/pulsar-broker/src/main/proto/ResourceUsage.proto
new file mode 100644
index 0000000..4706c9d
--- /dev/null
+++ b/pulsar-broker/src/main/proto/ResourceUsage.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.resource.usage;
+option java_package = "org.apache.pulsar.broker.service.resource.usage";
+option optimize_for = SPEED;
+
+message NetworkUsage {
+  required uint64 bytesPerPeriod = 1;
+  required uint64 messagesPerPeriod = 2;
+}
+
+message StorageUsage {
+  required uint64 totalBytes = 1;
+}
+
+message ResourceUsage {
+  // owner is the key(ID) of the entity that reports the usage
+  // It could be a resource-group or tenant, namespace or a topic
+  required string owner = 1;
+  optional NetworkUsage publish = 2;
+  optional NetworkUsage dispatch = 3;
+  optional StorageUsage storage = 4;
+}
+
+message ResourceUsageInfo {
+  // Name of the broker
+  required string broker = 1;
+
+  repeated ResourceUsage usageMap = 2;
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
new file mode 100644
index 0000000..4c1f186
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
+
+public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTest {
+
+    private static final String INTERNAL_TOPIC = "non-persistent://pulsar-test/test/resource-usage";
+    private static final int PUBLISH_INTERVAL_SECS = 1;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        prepareData();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testNamespaceCreation() throws Exception {
+        ResourceUsageTransportManager tManager = new ResourceUsageTransportManager(pulsar);
+        TopicName topicName = TopicName.get(INTERNAL_TOPIC);
+
+        assertTrue(admin.tenants().getTenants().contains(topicName.getTenant()));
+        assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace()));
+
+    }
+    
+    @Test
+    public void testPublish() throws Exception {
+        ResourceUsageTransportManager tManager = new ResourceUsageTransportManager(pulsar);
+        ResourceUsage recvdUsage = new ResourceUsage();
+        final String[] recvdBroker = new String[1];
+
+        ResourceUsagePublisher p = new ResourceUsagePublisher() {
+
+            @Override
+            public String getID() {
+                return "resource-group1";
+            }
+
+            @Override
+            public void fillResourceUsage(ResourceUsage resourceUsage) {
+
+                resourceUsage.setOwner(getID());
+                resourceUsage.setPublish().setMessagesPerPeriod(1000).setBytesPerPeriod(10001);
+                resourceUsage.setStorage().setTotalBytes(500003);
+
+            }
+        };
+
+        ResourceUsageConsumer c = new ResourceUsageConsumer() {
+            @Override
+            public String getID() {
+                return "resource-group1";
+            }
+
+            @Override
+            public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
+
+                recvdBroker[0] = broker;
+                recvdUsage.setOwner(resourceUsage.getOwner());
+                NetworkUsage p = recvdUsage.setPublish();
+                p.setBytesPerPeriod(resourceUsage.getPublish().getBytesPerPeriod());
+                p.setMessagesPerPeriod(resourceUsage.getPublish().getMessagesPerPeriod());
+
+                recvdUsage.setStorage().setTotalBytes(resourceUsage.getStorage().getTotalBytes());
+            }
+        };
+
+        tManager.registerResourceUsagePublisher(p);
+        tManager.registerResourceUsageConsumer(c);
+
+        Thread.sleep((PUBLISH_INTERVAL_SECS + 1) * 1000);
+
+        assertEquals(recvdBroker[0], pulsar.getBrokerServiceUrl());
+        assertNotNull(recvdUsage.getPublish());
+        assertNotNull(recvdUsage.getStorage());
+        assertEquals(recvdUsage.getPublish().getBytesPerPeriod(), 10001);
+        assertEquals(recvdUsage.getStorage().getTotalBytes(), 500003);
+    }
+
+    private void prepareData() throws PulsarAdminException {
+        this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager");
+        this.conf.setResourceUsageTransportPublishTopicName(INTERNAL_TOPIC);
+        this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
+    }
+}
\ No newline at end of file