Delete no used code (#174)
### Motivation
There are some unused codes in the current pulsar repository that need to be cleared.
### Modifications
* Delete spring-pulsar client
* Delete no used table
diff --git a/build.gradle b/build.gradle
index 9de840f..e268194 100644
--- a/build.gradle
+++ b/build.gradle
@@ -20,7 +20,6 @@
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public' }
- maven { url "https://dl.bintray.com/streamnative/maven" }
jcenter()
}
@@ -72,12 +71,10 @@
compile group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
compile group: 'com.google.guava', name: 'guava', version: guavaVersion
compile group: 'com.google.code.gson', name: 'gson', version: gsonVersion
- compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
compile group: 'org.apache.pulsar', name: 'pulsar-common', version: pulsarVersion
compile group: 'io.springfox', name: 'springfox-swagger2', version: swagger2Version
compile group: 'io.springfox', name: 'springfox-swagger-ui', version: swaggeruiVersion
compile group: 'org.powermock', name: 'powermock-api-mockito', version: apiMockitoVersion
compile group: 'org.powermock', name: 'powermock-module-junit4', version: mockitoJunit4Version
compileOnly group: 'org.projectlombok', name: 'lombok', version: lombokVersion
- testCompile group: 'com.h2database', name: 'h2', version: h2databaseVersion
}
diff --git a/docker/init_db.sql b/docker/init_db.sql
index 2b417ff..824b6cb 100644
--- a/docker/init_db.sql
+++ b/docker/init_db.sql
@@ -8,93 +8,6 @@
USE pulsar_manager;
-CREATE TABLE IF NOT EXISTS tenants (
- tenant varchar(255) NOT NULL PRIMARY KEY,
- tenantId BIGINT NOT NULL,
- adminRoles TEXT,
- allowedClusters TEXT,
- UNIQUE (tenantId)
-) ENGINE=InnoDB CHARACTER SET utf8;
-
-
-CREATE TABLE IF NOT EXISTS namespaces (
- namespaceId BIGINT NOT NULL,
- tenant varchar(255) NOT NULL,
- namespace varchar(255) NOT NULL,
- authPolicies TEXT,
- backlogQuota TEXT,
- replicationClusters TEXT,
- numBundles BIGINT,
- boundaries TEXT,
- topicDispatchRate TEXT,
- subscriptionDispatchRate TEXT,
- replicatorDispatchRate TEXT,
- clusterSubscribeRate TEXT,
- bookkeeperEnsemble BIGINT,
- bookkeeperWriteQuorum BIGINT,
- bookkeeperAckQuorum BIGINT,
- managedLedgerMaxMarkDeleteRate double,
- deduplicationEnabled BOOLEAN,
- latencyStatsSampleRate TEXT,
- messageTtlInSeconds BIGINT,
- retentionTimeInMinutes BIGINT,
- retentionSizeInMB BIGINT,
- deleted BOOLEAN,
- antiAffinityGroup VARCHAR(255),
- encryptionRequired BOOLEAN,
- subscriptionAuthMode VARCHAR(12),
- maxProducersPerTopic BIGINT,
- maxConsumersPerTopic BIGINT,
- maxConsumersPerSubscription BIGINT,
- compactionThreshold BIGINT,
- offloadThreshold BIGINT,
- offloadDeletionLagMs BIGINT,
- schemaValidationEnforced BOOLEAN,
- schemaAutoApdateCompatibilityStrategy VARCHAR(36),
- CONSTRAINT FK_tenant FOREIGN KEY (tenant) References tenants(tenant),
- CONSTRAINT PK_namespace PRIMARY KEY (tenant, namespace),
- UNIQUE (namespaceId)
-) ENGINE=InnoDB CHARACTER SET utf8;
--- ALTER table namespaces ADD INDEX namespaces_namespace_index(namespace);
-
-CREATE TABLE IF NOT EXISTS clusters (
- cluster varchar(255) NOT NULL PRIMARY KEY,
- clusterId BIGINT NOT NULL,
- serviceUrl varchar(1024),
- serviceUrlTls varchar(1024),
- brokerServiceUrl varchar(1024),
- brokerServiceUrlTls varchar(1024),
- peerClusterNames varchar(1024),
- UNIQUE (clusterId)
-) ENGINE=InnoDB CHARACTER SET utf8;
--- ALTER table clusters ADD INDEX clusters_cluster_index(cluster);
-
-CREATE TABLE IF NOT EXISTS brokers (
- broker varchar(1024) NOT NULL PRIMARY KEY,
- brokerId BIGINT NOT NULl,
- webServiceUrl varchar(1024),
- webServiceUrlTls varchar(1024),
- pulsarServiceUrl varchar(1024),
- pulsarServiceUrlTls varchar(1024),
- persistentTopicsEnabled BOOLEAN,
- nonPersistentTopicsEnabled BOOLEAN,
- brokerVersionString varchar(36),
- loadReportType varchar(36),
- maxResourceUsage double,
- UNIQUE (brokerId)
-) ENGINE=InnoDB CHARACTER SET utf8;
-
--- CREATE TABLE IF NOT EXISTS bundles (
--- broker varchar(1024) NOT NULL,
--- tenant varchar(255) NOT NULL,
--- namespace varchar(255) NOT NULL,
--- bundle varchar(1024) NOT NULL,
--- CONSTRAINT FK_broker FOREIGN KEY (broker) References brokers(broker),
--- CONSTRAINT FK_tenant FOREIGN KEY (tenant) References tenants(tenant),
--- CONSTRAINT FK_namespace FOREIGN KEY (namespace) References namespaces(namespace),
--- CONSTRAINT PK_bundle PRIMARY KEY (broker, tenant, namespace, bundle)
--- ) ENGINE=InnoDB CHARACTER SET utf8;
-
CREATE TABLE IF NOT EXISTS environments (
name varchar(256) NOT NULL,
broker varchar(1024) NOT NULL,
diff --git a/gradle.properties b/gradle.properties
index a8d5e49..1a3dabf 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -8,10 +8,9 @@
pageHelperVersion=1.2.4
mockitoVersion=1.10.19
guavaVersion=21.0
-pulsarVersion=2.5.0-2cc34afc0
+pulsarVersion=2.4.0
swagger2Version=2.9.2
swaggeruiVersion=2.9.2
apiMockitoVersion=1.7.1
mockitoJunit4Version=1.7.1
gsonVersion=2.8.2
-h2databaseVersion=1.4.199
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/Client.java b/src/main/java/io/streamnative/pulsar/manager/client/Client.java
deleted file mode 100644
index 9ba5ccc..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/Client.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import io.streamnative.pulsar.manager.client.config.ClientConfigurationData;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Pulsar Client init class.
- */
-public class Client implements AutoCloseable {
-
- private PulsarClient pulsarClient;
-
- private final ClientConfigurationData clientConfigurationData;
-
- public Client(ClientConfigurationData clientConfigurationData) {
- this.clientConfigurationData = clientConfigurationData;
- }
-
- public PulsarClient getPulsarClient() throws PulsarClientException {
- if (pulsarClient == null) {
- ClientBuilder clientBuilder = PulsarClient.builder();
- checkAndInitClientConfig(clientBuilder);
- pulsarClient = clientBuilder.build();
- }
- return pulsarClient;
- }
-
- public void checkAndInitClientConfig(ClientBuilder clientBuilder) {
- Preconditions.checkArgument(clientConfigurationData.getServiceUrl() != null
- && clientConfigurationData.getServiceUrl().startsWith("pulsar"), "Serviceurl is incorrect");
- clientBuilder.serviceUrl(clientConfigurationData.getServiceUrl());
- if (clientConfigurationData.getOperationTimeout() != null) {
- Preconditions.checkArgument(clientConfigurationData.getOperationTimeout() > 0,
- "Parameter operationTimeout should be greater than 0");
- clientBuilder.operationTimeout(clientConfigurationData.getOperationTimeout(), TimeUnit.MILLISECONDS);
- }
- if (clientConfigurationData.getEnableTcpNoDelay() != null) {
- clientBuilder.enableTcpNoDelay(clientConfigurationData.getEnableTcpNoDelay());
- }
- if (clientConfigurationData.getIoThreads() != null) {
- Preconditions.checkArgument(clientConfigurationData.getIoThreads() > 0,
- "Parameter ioThreads should be greater than 0");
- clientBuilder.ioThreads(clientConfigurationData.getIoThreads());
- }
- if (clientConfigurationData.getListenerThreads() != null) {
- Preconditions.checkArgument(clientConfigurationData.getListenerThreads() > 0,
- "Parameter listenerThreads should be greater than 0");
- clientBuilder.listenerThreads(clientConfigurationData.getListenerThreads());
- }
- if (clientConfigurationData.getConnectionsPerBroker() != null) {
- Preconditions.checkArgument(clientConfigurationData.getConnectionsPerBroker() > 0,
- "Parameter connectionsPerBroker should be greater than 0");
- clientBuilder.connectionsPerBroker(clientConfigurationData.getConnectionsPerBroker());
- }
- if (clientConfigurationData.getTlsTrustCertsFilePath() != null) {
- Preconditions.checkArgument(clientConfigurationData.getTlsTrustCertsFilePath().length() > 0,
- "Parameter tlsTrustCertsFilePath should be set");
- clientBuilder.tlsTrustCertsFilePath(clientConfigurationData.getTlsTrustCertsFilePath());
- }
- if (clientConfigurationData.getAllowTlsInsecureConnection() != null) {
- clientBuilder.allowTlsInsecureConnection(clientConfigurationData.getAllowTlsInsecureConnection());
- }
- if (clientConfigurationData.getEnableTlsHostnameVerification() != null) {
- clientBuilder.enableTlsHostnameVerification(clientConfigurationData.getEnableTlsHostnameVerification());
- }
- if (clientConfigurationData.getStatsInterval() != null) {
- Preconditions.checkArgument(clientConfigurationData.getStatsInterval() > 0L,
- "Parameter statsInterval should be greater than 0");
- clientBuilder.statsInterval(clientConfigurationData.getStatsInterval(), TimeUnit.SECONDS);
- }
- if (clientConfigurationData.getMaxConcurrentLookupRequests() != null) {
- Preconditions.checkArgument(clientConfigurationData.getMaxConcurrentLookupRequests() > 0,
- "Parameter maxConcurrentLookupRequests should be greater than 0");
- clientBuilder.maxConcurrentLookupRequests(clientConfigurationData.getMaxConcurrentLookupRequests());
- }
- if (clientConfigurationData.getMaxLookupRequests() != null) {
- Preconditions.checkArgument(clientConfigurationData.getMaxLookupRequests() > 0,
- "Parameter maxLookupRequests should be greater than 0");
- clientBuilder.maxLookupRequests(clientConfigurationData.getMaxLookupRequests());
- }
- if (clientConfigurationData.getMaxNumberOfRejectedRequestPerConnection() != null) {
- Preconditions.checkArgument(clientConfigurationData.getMaxNumberOfRejectedRequestPerConnection() > 0,
- "Parameter maxNumberOfRejectedRequestPerConnection should be greater than 0");
- clientBuilder.maxNumberOfRejectedRequestPerConnection(clientConfigurationData.getMaxNumberOfRejectedRequestPerConnection());
- }
- if (clientConfigurationData.getKeepAliveInterval() != null) {
- Preconditions.checkArgument(clientConfigurationData.getKeepAliveInterval() > 0,
- "Parameter keepAliveInterval should be greater than 0");
- clientBuilder.keepAliveInterval(clientConfigurationData.getKeepAliveInterval(), TimeUnit.SECONDS);
- }
- if (clientConfigurationData.getConnectionTimeout() != null) {
- Preconditions.checkArgument(clientConfigurationData.getConnectionTimeout() > 0,
- "Parameter connectionTimeout should be greater than 0");
- clientBuilder.connectionTimeout(clientConfigurationData.getConnectionTimeout(), TimeUnit.MILLISECONDS);
- }
- if (clientConfigurationData.getStartingBackoffInterval() != null) {
- Preconditions.checkArgument(clientConfigurationData.getStartingBackoffInterval() > 0,
- "Parameter startingBackoffInterval should be greater than 0");
- clientBuilder.startingBackoffInterval(clientConfigurationData.getStartingBackoffInterval(), TimeUnit.MILLISECONDS);
- }
- if (clientConfigurationData.getMaxBackoffInterval() != null) {
- Preconditions.checkArgument(clientConfigurationData.getMaxBackoffInterval() > 0,
- "Parameter maxBackoffInterval should be greater than 0");
- clientBuilder.maxBackoffInterval(clientConfigurationData.getMaxBackoffInterval(), TimeUnit.MILLISECONDS);
- }
- }
-
- public void close() throws Exception {
- if (pulsarClient != null) {
- pulsarClient.close();
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this).add("serviceUrl", clientConfigurationData.getServiceUrl())
- .add("operationTimeout", clientConfigurationData.getOperationTimeout())
- .add("ioThreads", clientConfigurationData.getIoThreads())
- .add("listenerThreads", clientConfigurationData.getListenerThreads())
- .add("connectionsPerBroker", clientConfigurationData.getConnectionsPerBroker())
- .add("tlsTrustCertsFilePath", clientConfigurationData.getTlsTrustCertsFilePath())
- .add("allowTlsInsecureConnection", clientConfigurationData.getAllowTlsInsecureConnection())
- .add("enableTlsHostnameVerification", clientConfigurationData.getEnableTlsHostnameVerification())
- .add("statsInterval", clientConfigurationData.getStatsInterval())
- .add("maxConcurrentLookupRequests", clientConfigurationData.getMaxConcurrentLookupRequests())
- .add("maxLookupRequests", clientConfigurationData.getMaxLookupRequests())
- .add("maxNumberOfRejectedRequestPerConnection", clientConfigurationData.getMaxNumberOfRejectedRequestPerConnection())
- .add("keepAliveInterval", clientConfigurationData.getKeepAliveInterval())
- .add("connectionTimeout", clientConfigurationData.getConnectionTimeout())
- .add("startingBackoffInterval", clientConfigurationData.getStartingBackoffInterval())
- .add("maxBackoffInterval", clientConfigurationData.getMaxBackoffInterval()).toString();
- }
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/PulsarApplicationListener.java b/src/main/java/io/streamnative/pulsar/manager/client/PulsarApplicationListener.java
deleted file mode 100644
index f3b6fee..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/PulsarApplicationListener.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client;
-
-import io.streamnative.pulsar.manager.client.config.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.context.ApplicationListener;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.event.ContextRefreshedEvent;
-import org.springframework.stereotype.Component;
-
-/**
- * PulsarApplicationListener do something after the spring framework initialization is complete.
- */
-@Component
-public class PulsarApplicationListener implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
-
- private static final Logger log = LoggerFactory.getLogger(PulsarApplicationListener.class);
-
- private ConfigurableApplicationContext applicationContext;
-
- private Client client;
-
- private ClientConfigurationData clientConfigurationData;
-
- @Value("${pulsar.client.serviceUrl}")
- private String pulsarServiceUrl;
-
- @Value("${pulsar.client.operationTimeout:30000}")
- private Integer operationTimeout;
-
- @Value("${pulsar.client.ioThreads:1}")
- private Integer ioThreads;
-
- @Value("${pulsar.client.listenerThreads:1}")
- private Integer listenerThreads;
-
- @Value("${pulsar.client.connectionsPerBroker:1}")
- private Integer connectionsPerBroker;
-
- @Value("${pulsar.client.enableTcpNoDelay:false}")
- private Boolean enableTcpNoDelay;
-
- @Value("${pulsar.client.tlsTrustCertsFilePath:}")
- private String tlsTrustCertsFilePath;
-
- @Value("${pulsar.client.allowTlsInsecureConnection:false}")
- private Boolean allowTlsInsecureConnection;
-
- @Value("${pulsar.client.enableTlsHostnameVerification:false}")
- private Boolean enableTlsHostnameVerification;
-
- @Value("${pulsar.client.statsInterval:60}")
- private Long statsInterval;
-
- @Value("${pulsar.client.maxConcurrentLookupRequests:5000}")
- private Integer maxConcurrentLookupRequests;
-
- @Value("${pulsar.client.maxLookupRequests:50000}")
- private Integer maxLookupRequests;
-
- @Value("${pulsar.client.maxNumberOfRejectedRequestPerConnection:50}")
- private Integer maxNumberOfRejectedRequestPerConnection;
-
- @Value("${pulsar.client.keepAliveInterval:30}")
- private Integer keepAliveInterval;
-
- @Value("${pulsar.client.connectionTimeout:100000}")
- private Integer connectionTimeout;
-
- @Value("${pulsar.client.startingBackoffInterval:100}")
- private Long startingBackoffInterval;
-
- @Value("${pulsar.client.maxBackoffInterval:30}")
- private Long maxBackoffInterval;
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- if (applicationContext instanceof ConfigurableApplicationContext) {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
- }
- }
-
- @Override
- public void onApplicationEvent(ContextRefreshedEvent event) {
- log.info("Start onApplicationEvent");
- }
-
- private void initPulsarClientConfig() {
- clientConfigurationData.setServiceUrl(pulsarServiceUrl);
- clientConfigurationData.setOperationTimeout(operationTimeout);
- clientConfigurationData.setAllowTlsInsecureConnection(allowTlsInsecureConnection);
- clientConfigurationData.setEnableTcpNoDelay(enableTcpNoDelay);
- clientConfigurationData.setIoThreads(ioThreads);
- clientConfigurationData.setListenerThreads(listenerThreads);
- clientConfigurationData.setConnectionTimeout(connectionTimeout);
- clientConfigurationData.setEnableTcpNoDelay(enableTcpNoDelay);
- if (tlsTrustCertsFilePath != null && tlsTrustCertsFilePath.length() > 0) {
- clientConfigurationData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
- }
- clientConfigurationData.setEnableTlsHostnameVerification(enableTlsHostnameVerification);
- clientConfigurationData.setStatsInterval(statsInterval);
- clientConfigurationData.setMaxConcurrentLookupRequests(maxConcurrentLookupRequests);
- clientConfigurationData.setMaxLookupRequests(maxLookupRequests);
- clientConfigurationData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
- clientConfigurationData.setKeepAliveInterval(keepAliveInterval);
- clientConfigurationData.setStartingBackoffInterval(startingBackoffInterval);
- clientConfigurationData.setMaxBackoffInterval(maxBackoffInterval);
- }
-
- public Client getClient() {
- if (client == null) {
- clientConfigurationData = new ClientConfigurationData();
- this.initPulsarClientConfig();
- client = new Client(clientConfigurationData);
- }
- return client;
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/annotation/PulsarListener.java b/src/main/java/io/streamnative/pulsar/manager/client/annotation/PulsarListener.java
deleted file mode 100644
index 3ecf1f2..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/annotation/PulsarListener.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.annotation;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.schema.SchemaType;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * An annotation is used to initialize the PulsarConsumer's related configuration.
- * <pre>{@code
- * @PulsarListener(topic = "test567", subscriptionName = "test567",
- * schema = PulsarTopicEvent.class, schemaType = SchemaType.AVRO)
- * public void receive(Message message) {
- * log.info("Received messages: {}", message.toString());
- * }
- * }</pre>
- */
-@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
-@Retention(RetentionPolicy.RUNTIME)
-public @interface PulsarListener {
-
- /**
- * Unique id of the container thread.
- */
- String id() default "";
-
- /**
- * Specify multi topic that this consumer will subscribe on.
- */
- String[] topics();
-
- /**
- * Specify a pattern for topics that this consumer will subscribe on.
- */
- String topicsPattern() default "";
-
- /**
- * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
- * 10 seconds.
- */
- long ackTimeout() default 10L;
-
- /**
- * Set the delay to wait before re-delivering messages that have failed to be process. The default is 1 min.
- */
- long negativeAckRedeliveryDelay() default 60000L;
-
- /**
- * Sets the size of the consumer receive queue.
- * Default value is {@code 1000} messages and should be good for most use cases.
- */
- int receiverQueueSize() default 1000;
-
- /**
- * Group the consumer acknowledgments for the specified time. Default is 100ms
- */
- long acknowledgmentGroupTime() default 0;
-
- /**
- * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
- * messages.
- */
- int priorityLevel() default 0;
-
- /**
- * Specify the subscription name for this consumer.
- */
- String subscriptionName();
-
- /**
- * Schema to use when receiving messages, it can be a custom pojo class, default is Byte.class.
- */
- Class schema() default Byte.class;
-
- /**
- * Types of supported schema for Pulsar messages.
- */
- SchemaType schemaType() default SchemaType.BYTES;
-
- /**
- * Select the subscription type to be used when subscribing to the topic.
- */
- SubscriptionType subscriptionType() default SubscriptionType.Exclusive;
-
- /**
- * If enabled, the consumer will auto subscribe for partitions increasement. This is only for partitioned consumer.
- */
- boolean autoUpdatePartitions() default false;
-
- /**
- * Set the consumer name.
- */
- String consumerName() default "";
-
- /**
- * Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used
- * with pattern subscriptions.
- */
- RegexSubscriptionMode regexSubscriptionMode() default RegexSubscriptionMode.PersistentOnly;
-}
\ No newline at end of file
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/annotation/PulsarListenerPostProcessor.java b/src/main/java/io/streamnative/pulsar/manager/client/annotation/PulsarListenerPostProcessor.java
deleted file mode 100644
index 06207b9..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/annotation/PulsarListenerPostProcessor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.annotation;
-
-import io.streamnative.pulsar.manager.client.PulsarApplicationListener;
-import io.streamnative.pulsar.manager.client.config.PulsarBootstrapConfiguration;
-import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
-import io.streamnative.pulsar.manager.client.config.PulsarConsumerConfigRegister;
-import io.streamnative.pulsar.manager.client.consumer.PulsarConsumerContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.aop.support.AopUtils;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.BeanFactory;
-import org.springframework.beans.factory.BeanFactoryAware;
-import org.springframework.beans.factory.SmartInitializingSingleton;
-import org.springframework.beans.factory.config.BeanPostProcessor;
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
-import org.springframework.core.MethodIntrospector;
-import org.springframework.core.annotation.AnnotatedElementUtils;
-
-import java.lang.reflect.Method;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Parse annotation, for example PulsarListener.
- */
-public class PulsarListenerPostProcessor implements BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton {
-
- private static final Logger log = LoggerFactory.getLogger(PulsarListenerPostProcessor.class);
-
- private BeanFactory beanFactory;
-
- private final PulsarConsumerConfigRegister pulsarConsumerConfigRegister = new PulsarConsumerConfigRegister();
-
- private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(
- new ConcurrentHashMap<>(64));
-
- private static final String PREFIX_ID = "spring.pulsar.container#";
-
- private final AtomicInteger counter = new AtomicInteger();
-
- @Override
- public void setBeanFactory(BeanFactory beanFactory) {
- log.info("Start set bean Factory");
- this.beanFactory = beanFactory;
- if (beanFactory instanceof ConfigurableListableBeanFactory) {
- PulsarApplicationListener pulsarApplicationListener = this.beanFactory.getBean(
- PulsarBootstrapConfiguration.PULSAR_APPLICATION_LISTENER, PulsarApplicationListener.class);
- this.pulsarConsumerConfigRegister.setPulsarApplicationListener(pulsarApplicationListener);
- }
- }
-
- @Override
- public void afterSingletonsInstantiated() {
- this.pulsarConsumerConfigRegister.afterPropertiesSet();
- }
-
- @Override
- public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
- return bean;
- }
-
- // Parse annotation
- @Override
- public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
- if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
- Class<?> targetClass = AopUtils.getTargetClass(bean);
- Map<Method, Set<PulsarListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
- (MethodIntrospector.MetadataLookup<Set<PulsarListener>>) method -> {
- Set<PulsarListener> listenerMethods = findListenerAnnotations(method);
- return (!listenerMethods.isEmpty() ? listenerMethods : null);
- });
- if (annotatedMethods.isEmpty()) {
- this.nonAnnotatedClasses.add(bean.getClass());
- } else {
- // Custom annotation
- for (Map.Entry<Method, Set<PulsarListener>> entry : annotatedMethods.entrySet()) {
- Method method = entry.getKey();
- for (PulsarListener listener : entry.getValue()) {
- processPulsarListener(listener, method, bean, beanName);
- }
- }
- }
- }
- return bean;
- }
-
- private Set<PulsarListener> findListenerAnnotations(Method method) {
- Set<PulsarListener> listeners = new HashSet<>();
- PulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(method, PulsarListener.class);
- if (ann != null) {
- listeners.add(ann);
- }
- return listeners;
- }
-
- private void processPulsarListener(PulsarListener pulsarListener, Method method, Object bean, String beanName) {
- ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData();
- if (pulsarListener.id().length() > 0) {
- consumerConfigurationData.setId(pulsarListener.id());
- } else {
- consumerConfigurationData.setId(PREFIX_ID + this.counter.getAndIncrement());
- }
- consumerConfigurationData.setSubscriptionName(pulsarListener.subscriptionName());
- consumerConfigurationData.setSubscriptionType(pulsarListener.subscriptionType());
- consumerConfigurationData.setAckTimeout(pulsarListener.ackTimeout());
- consumerConfigurationData.setAcknowledgmentGroupTime(pulsarListener.acknowledgmentGroupTime());
- consumerConfigurationData.setNegativeAckRedeliveryDelay(pulsarListener.negativeAckRedeliveryDelay());
- consumerConfigurationData.setReceiverQueueSize(pulsarListener.receiverQueueSize());
- consumerConfigurationData.setAcknowledgmentGroupTime(pulsarListener.acknowledgmentGroupTime());
- consumerConfigurationData.setTopics(pulsarListener.topics());
- consumerConfigurationData.setPriorityLevel(pulsarListener.priorityLevel());
- consumerConfigurationData.setTopicsPattern(pulsarListener.topicsPattern());
- consumerConfigurationData.setAutoUpdatePartitions(pulsarListener.autoUpdatePartitions());
- consumerConfigurationData.setConsumerName(pulsarListener.consumerName());
- consumerConfigurationData.setRegexSubscriptionMode(pulsarListener.regexSubscriptionMode());
- consumerConfigurationData.setMethod(method);
- consumerConfigurationData.setBean(bean);
- consumerConfigurationData.setSchema(pulsarListener.schema());
- consumerConfigurationData.setSchemaType(pulsarListener.schemaType());
- log.info("Initialization configured to {} use configuration {}", beanName, consumerConfigurationData.toString());
- this.pulsarConsumerConfigRegister.setConsumerContainer(consumerConfigurationData);
- }
-
- public PulsarConsumerContainer getPulsarConsumerContainer(String id) {
- return this.pulsarConsumerConfigRegister.getConsumerContainer(id);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/config/ClientConfigurationData.java b/src/main/java/io/streamnative/pulsar/manager/client/config/ClientConfigurationData.java
deleted file mode 100644
index b9d9440..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/config/ClientConfigurationData.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
-/**
- * Pulsar Client Configuration.
- */
-@Getter
-@Setter
-@ToString
-public class ClientConfigurationData {
-
- private String serviceUrl;
-
- private Integer operationTimeout;
-
- private Integer ioThreads;
-
- private Integer listenerThreads;
-
- private Integer connectionsPerBroker;
-
- private Boolean enableTcpNoDelay;
-
- private String tlsTrustCertsFilePath;
-
- private Boolean allowTlsInsecureConnection;
-
- private Boolean enableTlsHostnameVerification;
-
- private Long statsInterval;
-
- private Integer maxConcurrentLookupRequests;
-
- private Integer maxLookupRequests;
-
- private Integer maxNumberOfRejectedRequestPerConnection;
-
- private Integer keepAliveInterval;
-
- private Integer connectionTimeout;
-
- private Long startingBackoffInterval;
-
- private Long maxBackoffInterval;
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/config/ConsumerConfigurationData.java b/src/main/java/io/streamnative/pulsar/manager/client/config/ConsumerConfigurationData.java
deleted file mode 100644
index 3b2f25b..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/config/ConsumerConfigurationData.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.schema.SchemaType;
-
-import java.lang.reflect.Method;
-
-/**
- * Pulsar PulsarConsumer Configuration{@link PulsarListener} class.
- */
-@Getter
-@Setter
-@ToString
-public class ConsumerConfigurationData {
-
- private String id;
-
- private String[] topics;
-
- private String topicsPattern;
-
- private Long ackTimeout;
-
- private Long negativeAckRedeliveryDelay;
-
- private Integer receiverQueueSize;
-
- private Long acknowledgmentGroupTime;
-
- private Integer priorityLevel;
-
- private Method method;
-
- private Class schema;
-
- private SchemaType schemaType;
-
- private Object bean;
-
- private String subscriptionName;
-
- private SubscriptionType subscriptionType;
-
- private Boolean autoUpdatePartitions;
-
- private String consumerName;
-
- private RegexSubscriptionMode regexSubscriptionMode;
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/config/PulsarBootstrapConfiguration.java b/src/main/java/io/streamnative/pulsar/manager/client/config/PulsarBootstrapConfiguration.java
deleted file mode 100644
index 4a47a83..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/config/PulsarBootstrapConfiguration.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.config;
-
-import io.streamnative.pulsar.manager.client.PulsarApplicationListener;
-import io.streamnative.pulsar.manager.client.annotation.PulsarListenerPostProcessor;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Initialize two bean components, PULSAR_APPLICATION_LISTENER and PULSAR_LISTENER_POST_PROCESSOR.
- * PULSAR_APPLICATION_LISTENER for set pulsarClient.
- * PULSAR_LISTENER_POST_PROCESSOR for parse annotation.
- */
-@Configuration
-public class PulsarBootstrapConfiguration {
-
- public static final String PULSAR_LISTENER_POST_PROCESSOR = "PULSAR_LISTENER_POST_PROCESSOR";
-
- public static final String PULSAR_APPLICATION_LISTENER = "PULSAR_APPLICATION_LISTENER";
-
-
- @Bean(name = PULSAR_APPLICATION_LISTENER)
- public PulsarApplicationListener defaultPulsarApplicationListener() {
- return new PulsarApplicationListener();
- }
-
- @Bean(name = PULSAR_LISTENER_POST_PROCESSOR)
- public PulsarListenerPostProcessor defaultPulsarListenerPostProcessor() { return new PulsarListenerPostProcessor(); }
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/config/PulsarConsumerConfigRegister.java b/src/main/java/io/streamnative/pulsar/manager/client/config/PulsarConsumerConfigRegister.java
deleted file mode 100644
index 28325bf..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/config/PulsarConsumerConfigRegister.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.config;
-
-import io.streamnative.pulsar.manager.client.PulsarApplicationListener;
-import io.streamnative.pulsar.manager.client.consumer.PulsarConsumerContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * ConsumerConfigurationData register class.
- */
-public class PulsarConsumerConfigRegister implements InitializingBean {
-
- private static final Logger log = LoggerFactory.getLogger(PulsarConsumerConfigRegister.class);
-
- private final Map<String, PulsarConsumerContainer> consumerContainers = new ConcurrentHashMap<>();
-
- private PulsarApplicationListener pulsarApplicationListener;
-
- public void setPulsarApplicationListener(PulsarApplicationListener pulsarApplicationListener) {
- this.pulsarApplicationListener = pulsarApplicationListener;
- }
-
- public PulsarApplicationListener getPulsarApplicationListener() {
- return this.pulsarApplicationListener;
- }
-
- public void setConsumerContainer(ConsumerConfigurationData consumerConfigurationData) {
- synchronized (this.consumerContainers) {
- log.info("Start init consumer execute container");
- PulsarConsumerContainer pulsarConsumerContainer = new PulsarConsumerContainer(
- pulsarApplicationListener.getClient(), consumerConfigurationData);
- this.consumerContainers.put(consumerConfigurationData.getId(), pulsarConsumerContainer);
- }
- }
-
- @Override
- public void afterPropertiesSet() {
- startAllContainers();
- }
-
- public void startAllContainers() {
- synchronized (this.consumerContainers) {
- this.consumerContainers.forEach((k, v) -> {
- v.start();
- });
- }
- }
-
- public void stopAllContainers() {
- synchronized (this.consumerContainers) {
- this.consumerContainers.forEach((k, v) -> {
- v.stop(null);
- });
- }
- }
-
- public PulsarConsumerContainer getConsumerContainer(String id) {
- return this.consumerContainers.get(id);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumer.java b/src/main/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumer.java
deleted file mode 100644
index b9e3998..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumer.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.consumer;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import io.streamnative.pulsar.manager.client.Client;
-import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.schema.SchemaType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Pulsar Consumer initialization.
- */
-public class PulsarConsumer implements AutoCloseable {
-
- private static final Logger log = LoggerFactory.getLogger(PulsarConsumer.class);
-
- private final Client client;
-
- private final ConsumerConfigurationData consumerConfigurationData;
-
- private Schema schema;
-
- private Consumer consumer;
-
- PulsarConsumer(Client client, ConsumerConfigurationData consumerConfigurationData) {
- this.consumerConfigurationData = consumerConfigurationData;
- this.client = client;
- }
-
- public Consumer getConsumer() {
- if (consumer != null) {
- return consumer;
- }
- try {
- PulsarClient pulsarClient = client.getPulsarClient();
- if (schema == null) {
- schema = initSchema(consumerConfigurationData.getSchemaType(), consumerConfigurationData.getSchema());
- }
- ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(schema);
- initConsumerConfig(consumerBuilder);
- consumer = consumerBuilder.subscribe();
- log.info("Init pulsar client and pulsar consumer success use client configuration: {}," +
- "consumer configuration: {}", client.toString(), consumerConfigurationData.toString());
- } catch (PulsarClientException e) {
- log.error("Init pulsar client and consumer failed throws PulsarClientException, error: {}", e.getMessage());
- throw new RuntimeException("Init Pulsar Client failed.", e);
- } catch (Exception e) {
- log.error("Init client and consumer failed, exception: {}, use client=>{} and configuration=>{}",
- e.getMessage(), client.toString(), consumerConfigurationData.toString());
- throw new RuntimeException("Init Pulsar client failed because unknown error.", e);
- }
- return consumer;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public void setSchema(Schema schema) {
- // Allow user use custom schema;
- this.schema = schema;
- }
-
- private Schema initSchema(SchemaType schemaType, Class schema) {
- switch (schemaType) {
- case INT8:
- return Schema.INT8;
- case INT16:
- return Schema.INT16;
- case INT32:
- return Schema.INT32;
- case INT64:
- return Schema.INT64;
- case STRING:
- return Schema.STRING;
- case FLOAT:
- return Schema.FLOAT;
- case DOUBLE:
- return Schema.DOUBLE;
- case BOOLEAN:
- return Schema.BOOL;
- case BYTES:
- return Schema.BYTES;
- case DATE:
- return Schema.DATE;
- case TIME:
- return Schema.TIME;
- case TIMESTAMP:
- return Schema.TIMESTAMP;
- case KEY_VALUE:
- return Schema.KV_BYTES();
- case JSON:
- return Schema.JSON(schema);
- case AVRO:
- return Schema.AVRO(schema);
- default:
- return Schema.BYTES;
- }
- }
-
- public void initConsumerConfig(ConsumerBuilder consumerBuilder) {
- Preconditions.checkArgument(consumerConfigurationData.getSubscriptionName() != null
- && consumerConfigurationData.getSubscriptionName().length() > 0,
- "The subscription name is incorrect");
- consumerBuilder.subscriptionName(consumerConfigurationData.getSubscriptionName());
- Preconditions.checkArgument(consumerConfigurationData.getSubscriptionType() != null,
- "The subscription type should be set correctly."
- + "Exclusive, Failover, Shared and Key_Shared are currently supported.");
- consumerBuilder.subscriptionType(consumerConfigurationData.getSubscriptionType());
- if (consumerConfigurationData.getTopics() != null) {
- List<String> topics = Arrays.asList(consumerConfigurationData.getTopics());
- topics.forEach((topic) -> {
- Preconditions.checkArgument(topic != null && topic.length() > 0 ,
- "Length of topic should be greater than 0");
- });
- consumerBuilder.topics(topics);
- }
- if (consumerConfigurationData.getTopicsPattern() != null
- && consumerConfigurationData.getTopicsPattern().length() > 0) {
- consumerBuilder.topicsPattern(consumerConfigurationData.getTopicsPattern());
- }
- if (consumerConfigurationData.getAckTimeout() != null) {
- Preconditions.checkArgument(consumerConfigurationData.getAckTimeout() >= 10,
- "Parameter ackTimeout cannot be less than 10s");
- consumerBuilder.ackTimeout(consumerConfigurationData.getAckTimeout(), TimeUnit.SECONDS);
- }
- if (consumerConfigurationData.getReceiverQueueSize() != null) {
- Preconditions.checkArgument(consumerConfigurationData.getReceiverQueueSize() > 0,
- "Parameter receiverQueueSize should be greater than 0");
- consumerBuilder.receiverQueueSize(consumerConfigurationData.getReceiverQueueSize());
- }
-
- if (consumerConfigurationData.getAcknowledgmentGroupTime() != null) {
- Preconditions.checkArgument(consumerConfigurationData.getAcknowledgmentGroupTime() >= 0,
- "Parameter acknowledgmentGroupTime cannot be less than 0");
- consumerBuilder.acknowledgmentGroupTime(
- consumerConfigurationData.getAcknowledgmentGroupTime(), TimeUnit.MILLISECONDS);
- }
- if (consumerConfigurationData.getConsumerName() != null
- && consumerConfigurationData.getConsumerName().length() > 0) {
- consumerBuilder.consumerName(consumerConfigurationData.getConsumerName());
- }
- if (consumerConfigurationData.getNegativeAckRedeliveryDelay() != null) {
- Preconditions.checkArgument(consumerConfigurationData.getNegativeAckRedeliveryDelay() >= 0,
- "Parameter negativeAckRedeliveryDelay cannot be less than 0");
- consumerBuilder.negativeAckRedeliveryDelay(
- consumerConfigurationData.getNegativeAckRedeliveryDelay(), TimeUnit.MILLISECONDS);
- }
- if (consumerConfigurationData.getPriorityLevel() != null) {
- Preconditions.checkArgument(consumerConfigurationData.getPriorityLevel() >= 0,
- "Parameter priorityLevel cannot be less than 0");
- consumerBuilder.priorityLevel(consumerConfigurationData.getPriorityLevel());
- }
- if (consumerConfigurationData.getRegexSubscriptionMode() != null) {
- consumerBuilder.subscriptionTopicsMode(consumerConfigurationData.getRegexSubscriptionMode());
- }
- }
-
- @Override
- public void close() throws PulsarClientException {
- if (consumer != null) {
- consumer.close();
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("subscriptionName", consumerConfigurationData.getSubscriptionName())
- .add("subscriptionType", consumerConfigurationData.getSubscriptionType())
- .add("topics", consumerConfigurationData.getTopics())
- .add("topicsPattern", consumerConfigurationData.getTopicsPattern())
- .add("ackTimeout", consumerConfigurationData.getAckTimeout())
- .add("receiverQueueSize", consumerConfigurationData.getReceiverQueueSize())
- .add("acknowledgmentGroupTime", consumerConfigurationData.getAcknowledgmentGroupTime())
- .add("consumerName", consumerConfigurationData.getConsumerName())
- .add("priorityLevel", consumerConfigurationData.getPriorityLevel())
- .add("negativeAckRedeliveryDelay", consumerConfigurationData.getNegativeAckRedeliveryDelay())
- .add("regexSubscriptionMode", consumerConfigurationData.getRegexSubscriptionMode())
- .toString();
- }
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerContainer.java b/src/main/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerContainer.java
deleted file mode 100644
index 0489181..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerContainer.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.consumer;
-
-import com.google.common.base.Preconditions;
-import io.streamnative.pulsar.manager.client.Client;
-import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.SmartLifecycle;
-import org.springframework.core.task.SimpleAsyncTaskExecutor;
-import org.springframework.scheduling.SchedulingAwareRunnable;
-
-/**
- * Start PulsarConsumer
- */
-public class PulsarConsumerContainer implements SmartLifecycle {
-
- private static final Logger log = LoggerFactory.getLogger(PulsarConsumerContainer.class);
-
- private volatile boolean running = false;
-
- private final Client client;
-
- private final ConsumerConfigurationData consumerConfigurationData;
-
- private final ListenerConsumer listenerConsumer;
-
- public PulsarConsumerContainer(Client client, ConsumerConfigurationData consumerConfigurationData) {
- this.client = client;
- this.consumerConfigurationData = consumerConfigurationData;
- this.listenerConsumer = new ListenerConsumer(this.client, this.consumerConfigurationData);
- }
-
- @Override
- public boolean isAutoStartup() {
- return true;
- }
-
- @Override
- public final void stop() {}
-
- @Override
- public void stop(Runnable callback) {
- this.setRunning(false);
- try {
- if (this.listenerConsumer != null) {
- this.listenerConsumer.close();
- }
- log.info("Close consumer success");
- } catch (PulsarClientException e) {
- log.error("Close consumer failed, because: {}", e.getMessage());
- }
- }
-
- @Override
- public final void start() {
- if (isRunning()) {
- return;
- }
- log.info("Start thread to received messages");
- SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor();
- this.setRunning(true);
- consumerExecutor.submitListenable(listenerConsumer);
- }
-
- @Override
- public boolean isRunning() {
- return this.running;
- }
-
- protected void setRunning(boolean running) {
- this.running = running;
- }
-
- @Override
- public int getPhase() {
- return 0;
- }
-
- public PulsarConsumer getPulsarConsumer() {
- return this.listenerConsumer.getPulsarConsumer();
- }
-
- private final class ListenerConsumer implements SchedulingAwareRunnable {
-
- private final PulsarConsumer pulsarConsumer;
-
- ListenerConsumer(Client client, ConsumerConfigurationData consumerConfigurationData) {
- pulsarConsumer = new PulsarConsumer(client, consumerConfigurationData);
- }
-
- public PulsarConsumer getPulsarConsumer() {
- return pulsarConsumer;
- }
-
- @Override
- public boolean isLongLived() {
- return true;
- }
-
- public void close() throws PulsarClientException{
- pulsarConsumer.close();
- }
-
- @Override
- public void run() {
- Consumer consumer = pulsarConsumer.getConsumer();
- Preconditions.checkNotNull(consumer, "Consumer is null, this is not allowed");
- while (isRunning()) {
- try {
- Message msg = consumer.receive();
- try {
- consumerConfigurationData.getMethod().invoke(consumerConfigurationData.getBean(), msg);
- consumer.acknowledgeAsync(msg);
- } catch (Exception e) {
- consumer.negativeAcknowledge(msg);
- log.warn("Message handle failed: {}, redeliver later", msg.toString());
- }
- } catch (PulsarClientException e) {
- log.error("Received message has a error: {}", e.getMessage());
- }
- }
- }
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/controller/BrokersController.java b/src/main/java/io/streamnative/pulsar/manager/controller/BrokersController.java
index 2e49a12..4e6b7d5 100644
--- a/src/main/java/io/streamnative/pulsar/manager/controller/BrokersController.java
+++ b/src/main/java/io/streamnative/pulsar/manager/controller/BrokersController.java
@@ -72,19 +72,5 @@
Map<String, Object> result = brokersService.getBrokersList(pageNum, pageSize, cluster, requestServiceUrl);
return ResponseEntity.ok(result);
}
-//
-// @ApiOperation(value = "Query broker info")
-// @ApiResponses({
-// @ApiResponse(code = 200, message = "ok"),
-// @ApiResponse(code = 500, message = "Internal server error")
-// })
-// @RequestMapping(value = "/brokers/{broker}", method = RequestMethod.GET)
-// public ResponseEntity<Optional<BrokerEntity>> getBroker(
-// @ApiParam(value = "The name of broker")
-// @Size(min = 1, max = 255)
-// @PathVariable String broker) {
-// Optional<BrokerEntity> brokersEntity = brokersRepository.findByBroker(broker);
-// return ResponseEntity.ok(brokersEntity);
-// }
}
diff --git a/src/main/java/io/streamnative/pulsar/manager/controller/BundlesController.java b/src/main/java/io/streamnative/pulsar/manager/controller/BundlesController.java
deleted file mode 100644
index e400450..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/controller/BundlesController.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.controller;
-
-import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
-import io.streamnative.pulsar.manager.entity.BundleEntity;
-import io.streamnative.pulsar.manager.entity.BundlesRepository;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import org.hibernate.validator.constraints.Range;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-
-import javax.validation.constraints.Min;
-import javax.validation.constraints.Size;
-import java.util.Map;
-
-/**
- * Bundles rest api.
- */
-@RequestMapping(value = "/pulsar-manager/admin/v2")
-@Api(description = "Support more flexible queries to bundles.")
-@Validated
-@RestController
-public class BundlesController {
-
- @Autowired
- private BundlesRepository bundlesRepository;
-
- @ApiOperation(value = "Get the list of existing bundles, support paging, the default is 10 per page")
- @ApiResponses({
- @ApiResponse(code = 200, message = "ok"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- @RequestMapping(value = "/bundles", method = RequestMethod.GET)
- public ResponseEntity<Map<String, Object>> getBundles(
- @ApiParam(value = "page_num", defaultValue = "1", example = "1")
- @RequestParam(name = "page_num", defaultValue = "1")
- @Min(value = 1, message = "page_num is incorrect, should be greater than 0.")
- Integer pageNum,
- @ApiParam(value = "page_size", defaultValue = "10", example = "10")
- @RequestParam(name="page_size", defaultValue = "10")
- @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
- Integer pageSize) {
- Page<BundleEntity> bundlesEntityPage = bundlesRepository.getBundlesList(pageNum, pageSize);
- Map<String, Object> result = Maps.newHashMap();
- result.put("total", bundlesEntityPage.getTotal());
- result.put("data", bundlesEntityPage);
- return ResponseEntity.ok(result);
- }
-
- /**
- * Query information a through broker or tenant b namespace or bundle.
- * @param brokerOrtenantsOrNamespaceOrbundle This word can be abbreviated to btnb
- * @param pageNum number of page
- * @param pageSize size of page
- * @return {"total": 10, "data": "[{"tenant": "tenant}, {"namespace": "namespace"}]"}
- */
- @ApiOperation(value = "Query list by the name of broker or tenant or namespace or bundle," +
- "support paging, the default is 10 per page")
- @ApiResponses({
- @ApiResponse(code = 200, message = "ok"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- @RequestMapping(value = "/bundles/{brokerOrtenantsOrNamespaceOrbundle}", method = RequestMethod.GET)
- public ResponseEntity<Map<String, Object>> getBundlesBybtnb(
- @ApiParam(value = "The name of tenant or namespace.")
- @Size(min = 1, max = 255)
- @PathVariable String brokerOrtenantsOrNamespaceOrbundle,
- @ApiParam(value = "page_num", defaultValue = "1", example = "1")
- @RequestParam(name = "page_num", defaultValue = "1")
- @Min(value = 1, message = "page_num is incorrect, should be greater than 0.")
- Integer pageNum,
- @ApiParam(value = "page_size", defaultValue = "10", example = "10")
- @RequestParam(name="page_size", defaultValue = "10")
- @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
- Integer pageSize) {
- Page<BundleEntity> bundlesEntityPage = bundlesRepository
- .findByBrokerOrTenantOrNamespaceOrBundle(pageNum, pageSize, brokerOrtenantsOrNamespaceOrbundle);
- Map<String, Object> result = Maps.newHashMap();
- result.put("total", bundlesEntityPage.getTotal());
- result.put("data", bundlesEntityPage);
- return ResponseEntity.ok(result);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/controller/ClustersController.java b/src/main/java/io/streamnative/pulsar/manager/controller/ClustersController.java
index 12b2120..0db4043 100644
--- a/src/main/java/io/streamnative/pulsar/manager/controller/ClustersController.java
+++ b/src/main/java/io/streamnative/pulsar/manager/controller/ClustersController.java
@@ -13,8 +13,6 @@
*/
package io.streamnative.pulsar.manager.controller;
-import io.streamnative.pulsar.manager.entity.ClusterEntity;
-import io.streamnative.pulsar.manager.entity.ClustersRepository;
import io.streamnative.pulsar.manager.service.ClustersService;
import io.streamnative.pulsar.manager.service.EnvironmentCacheService;
import io.swagger.annotations.Api;
@@ -26,7 +24,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
@@ -34,9 +31,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.Min;
-import javax.validation.constraints.Size;
import java.util.Map;
-import java.util.Optional;
/**
* Cluster rest api
@@ -48,9 +43,6 @@
public class ClustersController {
@Autowired
- private ClustersRepository clustersRepository;
-
- @Autowired
private ClustersService clusterService;
@Autowired
@@ -87,18 +79,4 @@
return ResponseEntity.ok(result);
}
-
- @ApiOperation(value = "Query cluster info")
- @ApiResponses({
- @ApiResponse(code = 200, message = "ok"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- @RequestMapping(value = "/clusters/{cluster}", method = RequestMethod.GET)
- public ResponseEntity<Optional<ClusterEntity>> getCluster(
- @ApiParam(value = "The name of cluster")
- @Size(min = 1, max = 255)
- @PathVariable String cluster) {
- Optional<ClusterEntity> clustersEntity = clustersRepository.findByCluster(cluster);
- return ResponseEntity.ok(clustersEntity);
- }
}
diff --git a/src/main/java/io/streamnative/pulsar/manager/controller/NamespacesController.java b/src/main/java/io/streamnative/pulsar/manager/controller/NamespacesController.java
index 6faefa4..7a35a19 100644
--- a/src/main/java/io/streamnative/pulsar/manager/controller/NamespacesController.java
+++ b/src/main/java/io/streamnative/pulsar/manager/controller/NamespacesController.java
@@ -13,10 +13,6 @@
*/
package io.streamnative.pulsar.manager.controller;
-import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
-import io.streamnative.pulsar.manager.entity.NamespaceEntity;
-import io.streamnative.pulsar.manager.entity.NamespacesRepository;
import io.streamnative.pulsar.manager.service.EnvironmentCacheService;
import io.streamnative.pulsar.manager.service.NamespacesService;
import io.swagger.annotations.Api;
@@ -38,7 +34,6 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.Size;
import java.util.Map;
-import java.util.Optional;
/**
* Namespace Controller class
@@ -50,9 +45,6 @@
public class NamespacesController {
@Autowired
- private NamespacesRepository namespacesRepository;
-
- @Autowired
private NamespacesService namespacesService;
@Autowired
@@ -61,28 +53,6 @@
@Autowired
private HttpServletRequest request;
- @ApiOperation(value = "Get the list of existing namespaces, support paging, the default is 10 per page")
- @ApiResponses({
- @ApiResponse(code = 200, message = "ok"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- @RequestMapping(value = "/namespaces", method = RequestMethod.GET)
- public ResponseEntity<Map<String, Object>> getNamespaces(
- @ApiParam(value = "page_num", defaultValue = "1", example = "1")
- @RequestParam(name = "page_num", defaultValue = "1")
- @Min(value = 1, message = "page_num is incorrect, should be greater than 0.")
- Integer pageNum,
- @ApiParam(value = "page_size", defaultValue = "10", example = "10")
- @RequestParam(name="page_size", defaultValue = "10")
- @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
- Integer pageSize) {
- Page<NamespaceEntity> namespacesEntities = namespacesRepository.getNamespacesList(pageNum, pageSize);
- Map<String, Object> result = Maps.newHashMap();
- result.put("total", namespacesEntities.getTotal());
- result.put("data", namespacesEntities);
- return ResponseEntity.ok(result);
- }
-
@ApiOperation(value = "Query list by the name of tenant or namespace, support paging, the default is 10 per page")
@ApiResponses({
@ApiResponse(code = 200, message = "ok"),
@@ -106,23 +76,6 @@
return ResponseEntity.ok(result);
}
- @ApiOperation(value = "Query namespace info by tenant and namespace")
- @ApiResponses({
- @ApiResponse(code = 200, message = "ok"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- @RequestMapping(value = "/namespaces/{tenant}/{namespace}", method = RequestMethod.GET)
- public ResponseEntity<Optional<NamespaceEntity>> getNamespacesByTenantNamespace(
- @ApiParam(value = "The name of tenant")
- @Size(min = 1, max = 255)
- @PathVariable String tenant,
- @ApiParam(value = "The name of namespace")
- @Size(min = 1, max = 255)
- @PathVariable String namespace) {
- Optional<NamespaceEntity> namespacesEntity = namespacesRepository.findByTenantNamespace(tenant, namespace);
- return ResponseEntity.ok(namespacesEntity);
- }
-
@ApiOperation(value = "Query namespace stats info by tenant and namespace")
@ApiResponses({
@ApiResponse(code = 200, message = "ok"),
diff --git a/src/main/java/io/streamnative/pulsar/manager/dao/BrokersRepositoryImpl.java b/src/main/java/io/streamnative/pulsar/manager/dao/BrokersRepositoryImpl.java
deleted file mode 100644
index 788ebca..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/dao/BrokersRepositoryImpl.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import io.streamnative.pulsar.manager.entity.BrokerEntity;
-import io.streamnative.pulsar.manager.entity.BrokersRepository;
-import io.streamnative.pulsar.manager.mapper.BrokersMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-/**
- * BrokersRepositoryImpl implements BrokersRepository for operation crud of broker.
- */
-@Repository
-public class BrokersRepositoryImpl implements BrokersRepository {
-
- private final BrokersMapper brokersMapper;
-
- @Autowired
- public BrokersRepositoryImpl(BrokersMapper brokersMapper) {
- this.brokersMapper = brokersMapper;
- }
-
- @Override
- public Optional<BrokerEntity> findById(long brokerId) {
- return Optional.ofNullable(brokersMapper.findById(brokerId));
- }
-
- @Override
- public Optional<BrokerEntity> findByBroker(String broker) {
- return Optional.ofNullable(brokersMapper.findByBroker(broker));
- }
-
- @Override
- public Page<BrokerEntity> getBrokersList(Integer pageNum, Integer pageSize) {
- PageHelper.startPage(pageNum, pageSize);
- Page<BrokerEntity> brokersEntities = brokersMapper.getBrokersList();
- return brokersEntities;
- }
-
- @Override
- public void save(BrokerEntity brokersEntity) {
- brokersMapper.insert(brokersEntity);
- }
-
- @Override
- public void remove(String broker) {
- brokersMapper.deleteByBroker(broker);
- }
-
- @Override
- public void update(BrokerEntity brokersEntity) {
- brokersMapper.update(brokersEntity);
- }
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/dao/BundlesRepositoryImpl.java b/src/main/java/io/streamnative/pulsar/manager/dao/BundlesRepositoryImpl.java
deleted file mode 100644
index 9eb97b6..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/dao/BundlesRepositoryImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import io.streamnative.pulsar.manager.entity.BundleEntity;
-import io.streamnative.pulsar.manager.entity.BundlesRepository;
-import io.streamnative.pulsar.manager.mapper.BundlesMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-
-/**
- * BundlesRepositoryImpl implements BundlesRepository for operation crud of bundle.
- */
-@Repository
-public class BundlesRepositoryImpl implements BundlesRepository {
-
- private final BundlesMapper bundlesMapper;
-
- @Autowired
- public BundlesRepositoryImpl(BundlesMapper bundlesMapper) {
- this.bundlesMapper = bundlesMapper;
- }
-
- @Override
- public Page<BundleEntity> findByBundle(Integer pageNum, Integer pageSize, String bundle) {
- PageHelper.startPage(pageNum, pageSize);
- return bundlesMapper.findByBundle(bundle);
- }
-
-
- @Override
- public Page<BundleEntity> findByBrokerOrTenantOrNamespaceOrBundle(Integer pageNum, Integer pageSize, String btnb) {
- PageHelper.startPage(pageNum, pageSize);
- Page<BundleEntity> bundlesEntities = bundlesMapper.findByBrokerOrTenantOrNamespaceOrBundle(btnb);
- return bundlesEntities;
- }
-
- @Override
- public Page<BundleEntity> getBundlesList(Integer pageNum, Integer pageSize) {
- PageHelper.startPage(pageNum, pageSize);
- Page<BundleEntity> bundlesEntities = bundlesMapper.getBundlesList();
- return bundlesEntities;
- }
-
- @Override
- public void remove(String broker, String tenant, String namespace, String bundle) {
- bundlesMapper.delete(broker, tenant, namespace, bundle);
- }
-
- @Override
- public void save(BundleEntity bundlesEntity) {
- bundlesMapper.insert(bundlesEntity);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/dao/ClustersRepositoryImpl.java b/src/main/java/io/streamnative/pulsar/manager/dao/ClustersRepositoryImpl.java
deleted file mode 100644
index 491b912..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/dao/ClustersRepositoryImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import io.streamnative.pulsar.manager.entity.ClusterEntity;
-import io.streamnative.pulsar.manager.entity.ClustersRepository;
-import io.streamnative.pulsar.manager.mapper.ClustersMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-/**
- * ClustersRepositoryImpl implements ClustersRepository for operation crud of bundle.
- */
-@Repository
-public class ClustersRepositoryImpl implements ClustersRepository {
-
- private final ClustersMapper clustersMapper;
-
- @Autowired
- public ClustersRepositoryImpl(ClustersMapper clustersMapper) {
- this.clustersMapper = clustersMapper;
- }
-
- @Override
- public Optional<ClusterEntity> findById(long clusterId) {
- return Optional.ofNullable(clustersMapper.findById(clusterId));
- }
-
- @Override
- public Optional<ClusterEntity> findByCluster(String cluster) {
- return Optional.ofNullable(clustersMapper.findByCluster(cluster));
- }
-
- @Override
- public Page<ClusterEntity> getClustersList(Integer pageNum, Integer pageSize) {
- PageHelper.startPage(pageNum, pageSize);
- Page<ClusterEntity> clustersEntities = clustersMapper.getClustersList();
- return clustersEntities;
- }
-
- @Override
- public void save(ClusterEntity clustersEntity) {
- clustersMapper.insert(clustersEntity);
- }
-
-
- @Override
- public void remove(String cluster) {
- clustersMapper.delete(cluster);
- }
-
-
- @Override
- public void update(ClusterEntity clustersEntity) {
- clustersMapper.update(clustersEntity);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/dao/NamespacesRepositoryImpl.java b/src/main/java/io/streamnative/pulsar/manager/dao/NamespacesRepositoryImpl.java
deleted file mode 100644
index f11f18a..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/dao/NamespacesRepositoryImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import io.streamnative.pulsar.manager.entity.NamespaceEntity;
-import io.streamnative.pulsar.manager.entity.NamespacesRepository;
-import io.streamnative.pulsar.manager.mapper.NamespacesMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-@Repository
-public class NamespacesRepositoryImpl implements NamespacesRepository {
-
- private final NamespacesMapper namespacesMapper;
-
- @Autowired
- public NamespacesRepositoryImpl(NamespacesMapper namespacesMapper) {
- this.namespacesMapper = namespacesMapper;
- }
-
- @Override
- public Optional<NamespaceEntity> findById(long namespaceId) {
- return Optional.ofNullable(namespacesMapper.findById(namespaceId));
- }
-
- @Override
- public Optional<NamespaceEntity> findByTenantNamespace(String tenant, String namespace) {
- return Optional.ofNullable(namespacesMapper.findByTenantNamespace(tenant, namespace));
- }
-
- @Override
- public Page<NamespaceEntity> findByTenantOrNamespace(Integer pageNum, Integer pageSize, String tenantOrNamespace) {
- PageHelper.startPage(pageNum, pageSize);
- Page<NamespaceEntity> namespacesEntities = namespacesMapper.findByTenantOrNamespace(tenantOrNamespace);
- return namespacesEntities;
- }
-
- @Override
- public Page<NamespaceEntity> findByNamespace(Integer pageNum, Integer pageSize, String namespace) {
- PageHelper.startPage(pageNum, pageSize);
- Page<NamespaceEntity> namespacesEntities = namespacesMapper.findByNamespace(namespace);
- return namespacesEntities;
- }
-
- @Override
- public Page<NamespaceEntity> getNamespacesList(Integer pageNum, Integer pageSize) {
- PageHelper.startPage(pageNum, pageSize);
- Page<NamespaceEntity> namespacesEntities = namespacesMapper.getNamespacesList();
- return namespacesEntities;
- }
-
- @Override
- public Page<NamespaceEntity> findByTenant(Integer pageNum, Integer pageSize, String tenant) {
- PageHelper.startPage(pageNum, pageSize);
- Page<NamespaceEntity> namespacesEntities = namespacesMapper.findByTenant(tenant);
- return namespacesEntities;
- }
-
- @Override
- public void remove(String tenant, String namespace) {
- namespacesMapper.deleteByTenantNamespace(tenant, namespace);
- }
-
- @Override
- public void update(NamespaceEntity namespacesEntity) {
- namespacesMapper.updateByTenantNamespace(namespacesEntity);
- }
-
- @Override
- public void save(NamespaceEntity namespacesEntity) {
- namespacesMapper.insert(namespacesEntity);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/dao/TenantsRepositoryImpl.java b/src/main/java/io/streamnative/pulsar/manager/dao/TenantsRepositoryImpl.java
deleted file mode 100644
index c1d0121..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/dao/TenantsRepositoryImpl.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import io.streamnative.pulsar.manager.entity.TenantsRepository;
-import io.streamnative.pulsar.manager.entity.TenantEntity;
-import io.streamnative.pulsar.manager.mapper.TenantsMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-@Repository
-public class TenantsRepositoryImpl implements TenantsRepository {
- private final TenantsMapper tenantsMapper;
-
- @Autowired
- public TenantsRepositoryImpl(TenantsMapper tenantsMapper) { this.tenantsMapper = tenantsMapper; }
-
- @Override
- public Optional<TenantEntity> findById(long tenantId) {
- return Optional.ofNullable(tenantsMapper.findById(tenantId));
- }
-
- @Override
- public Optional<TenantEntity> findByName(String tenant) {
- return Optional.ofNullable(tenantsMapper.findByName(tenant));
- }
-
- @Override
- public Page<TenantEntity> getTenantsList(Integer pageNum, Integer pageSize) {
- PageHelper.startPage(pageNum, pageSize);
- Page<TenantEntity> tenantsEntities = tenantsMapper.getTenantsList();
- return tenantsEntities;
- }
-
- @Override
- public void save(TenantEntity tenantsEntity) {
- tenantsMapper.insert(tenantsEntity);
- }
-
-
- @Override
- public void remove(Long tenantId) {
- tenantsMapper.delete(tenantId);
- }
-
- @Override
- public void removeByTenant(String tenant) {
- tenantsMapper.deleteByTenant(tenant);
- }
-
- @Override
- public void updateByTenant(String tenant) {
- tenantsMapper.updateByTenant(tenant);
- }
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/BrokerEntity.java b/src/main/java/io/streamnative/pulsar/manager/entity/BrokerEntity.java
deleted file mode 100644
index 8c67ef8..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/BrokerEntity.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-/**
- * Broker entity.
- */
-@Getter
-@Setter
-@NoArgsConstructor
-public class BrokerEntity {
- private long brokerId;
- private String broker;
- // URLs to satisfy contract of ServiceLookupData (used by NamespaceService).
- private String webServiceUrl;
- private String webServiceUrlTls;
- private String pulsarServiceUrl;
- private String pulsarServiceUrlTls;
- private boolean persistentTopicsEnabled = true;
- private boolean nonPersistentTopicsEnabled = true;
-
- private String brokerVersionString;
-
- private String loadReportType;
-
- private double maxResourceUsage;
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/BrokersRepository.java b/src/main/java/io/streamnative/pulsar/manager/entity/BrokersRepository.java
deleted file mode 100644
index e4af3cc..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/BrokersRepository.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import com.github.pagehelper.Page;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-/**
- * Interface of brokers
- */
-@Repository
-public interface BrokersRepository {
-
- void save(BrokerEntity brokersEntity);
-
- Optional<BrokerEntity> findById(long brokerId);
-
- Optional<BrokerEntity> findByBroker(String broker);
-
- Page<BrokerEntity> getBrokersList(Integer pageNum, Integer pageSize);
-
- void remove(String broker);
-
- void update(BrokerEntity brokersEntity);
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/BundleEntity.java b/src/main/java/io/streamnative/pulsar/manager/entity/BundleEntity.java
deleted file mode 100644
index a9df264..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/BundleEntity.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-/**
- * Bundle entity.
- */
-@Getter
-@Setter
-@NoArgsConstructor
-public class BundleEntity {
-
- private String broker;
-
- private String tenant;
-
- private String namespace;
-
- private String bundle;
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/BundlesRepository.java b/src/main/java/io/streamnative/pulsar/manager/entity/BundlesRepository.java
deleted file mode 100644
index f34b0e7..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/BundlesRepository.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import com.github.pagehelper.Page;
-import org.springframework.stereotype.Repository;
-
-/**
- * Interface of bundles
- */
-@Repository
-public interface BundlesRepository {
-
- void save(BundleEntity bundlesEntity);
-
- Page<BundleEntity> findByBrokerOrTenantOrNamespaceOrBundle(Integer pageNum, Integer pageSize, String btnt);
-
- Page<BundleEntity> findByBundle(Integer pageNum, Integer pageSize, String bundle);
-
- Page<BundleEntity> getBundlesList(Integer pageNum, Integer pageSize);
-
- void remove(String broker, String tenant, String namespace, String bundle);
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/ClusterEntity.java b/src/main/java/io/streamnative/pulsar/manager/entity/ClusterEntity.java
deleted file mode 100644
index 6e1e16a..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/ClusterEntity.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-@Getter
-@Setter
-@NoArgsConstructor
-public class ClusterEntity {
- private long clusterId;
- private String cluster;
-
- private String serviceUrl;
-
- private String serviceUrlTls;
-
- private String brokerServiceUrl;
-
- private String brokerServiceUrlTls;
-
- private String peerClusterNames;
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/ClustersRepository.java b/src/main/java/io/streamnative/pulsar/manager/entity/ClustersRepository.java
deleted file mode 100644
index a92047d..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/ClustersRepository.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import com.github.pagehelper.Page;
-import org.springframework.stereotype.Repository;
-
-import java.util.Optional;
-
-@Repository
-public interface ClustersRepository {
-
- void save(ClusterEntity clustersEntity);
-
- Optional<ClusterEntity> findById(long clusterId);
-
- Optional<ClusterEntity> findByCluster(String cluster);
-
- Page<ClusterEntity> getClustersList(Integer pageNum, Integer pageSize);
-
- void remove(String cluster);
-
- void update(ClusterEntity clustersEntity);
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/NamespaceEntity.java b/src/main/java/io/streamnative/pulsar/manager/entity/NamespaceEntity.java
deleted file mode 100644
index e79a8d7..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/NamespaceEntity.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-@Getter
-@Setter
-@NoArgsConstructor
-public class NamespaceEntity {
- private long namespaceId;
- private String tenant;
- private String namespace;
-
- private String authPolicies;
- private String replicationClusters;
-
- private int numBundles;
- private String boundaries;
- private String backlogQuota;
- private String topicDispatchRate;
- private String subscriptionDispatchRate;
- private String replicatorDispatchRate;
- private String clusterSubscribeRate;
-
-
- private int bookkeeperEnsemble;
- private int bookkeeperWriteQuorum;
- private int bookkeeperAckQuorum;
- private double managedLedgerMaxMarkDeleteRate;
- // If set, it will override the broker settings for enabling deduplication
- private Boolean deduplicationEnabled;
-
- private String latencyStatsSampleRate;
- private int messageTtlInSeconds;
-
- private int retentionTimeInMinutes;
- private long retentionSizeInMB;
- private boolean deleted;
- private String antiAffinityGroup;
-
- private boolean encryptionRequired;
- private String subscriptionAuthMode;
-
- private int maxProducersPerTopic;
- private int maxConsumersPerTopic;
- private int maxConsumersPerSubscription;
-
- private long compactionThreshold;
- private long offloadThreshold;
- private Long offloadDeletionLagMs;
-
- private String schemaAutoApdateCompatibilityStrategy;
-
- private boolean schemaValidationEnforced;
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/NamespacesRepository.java b/src/main/java/io/streamnative/pulsar/manager/entity/NamespacesRepository.java
deleted file mode 100644
index 45a650b..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/NamespacesRepository.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import org.springframework.stereotype.Repository;
-import com.github.pagehelper.Page;
-
-import java.util.Optional;
-
-@Repository
-public interface NamespacesRepository {
-
- void save(NamespaceEntity namespacesEntity);
-
- Optional<NamespaceEntity> findById(long namespaceId);
-
- Optional<NamespaceEntity> findByTenantNamespace(String tenant, String namespace);
-
- Page<NamespaceEntity> findByTenantOrNamespace(Integer pageNum, Integer pageSize, String tenantOrNamespace);
-
- Page<NamespaceEntity> findByNamespace(Integer pageNum, Integer pageSize, String namespace);
-
- Page<NamespaceEntity> getNamespacesList(Integer pageNum, Integer pageSize);
-
- Page<NamespaceEntity> findByTenant(Integer pageNum, Integer pageSize, String tenant);
-
- void remove(String tenant, String namespace);
-
- void update(NamespaceEntity namespacesEntity);
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/TenantEntity.java b/src/main/java/io/streamnative/pulsar/manager/entity/TenantEntity.java
deleted file mode 100644
index c4352eb..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/TenantEntity.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-/**
- * Tenants entity
- * tenant -> name of tenant
- */
-@Getter
-@Setter
-@NoArgsConstructor
-@AllArgsConstructor
-public class TenantEntity {
- private long tenantId;
- private String tenant;
- private String adminRoles;
- private String allowedClusters;
-}
\ No newline at end of file
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/TenantsRepository.java b/src/main/java/io/streamnative/pulsar/manager/entity/TenantsRepository.java
deleted file mode 100644
index 10eb06f..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/entity/TenantsRepository.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.entity;
-
-import java.util.Optional;
-
-import com.github.pagehelper.Page;
-import org.springframework.stereotype.Repository;
-
-@Repository
-public interface TenantsRepository {
-
- void save(TenantEntity tenantsEntity);
-
- Optional<TenantEntity> findById(long tenantId);
-
- Optional<TenantEntity> findByName(String tenant);
-
- Page<TenantEntity> getTenantsList(Integer pageNum, Integer pageSize);
-
- void remove(Long tenantId);
-
- void removeByTenant(String tenant);
-
- void updateByTenant(String tenant);
-}
-
-
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/BrokersMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/BrokersMapper.java
deleted file mode 100644
index aa25ce0..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/BrokersMapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.mapper;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.entity.BrokerEntity;
-import org.apache.ibatis.annotations.*;
-
-/**
- * Insert, delete, udpate and query for Broker.
- */
-@Mapper
-public interface BrokersMapper {
-
- @Insert("INSERT INTO brokers(brokerId,broker,webServiceUrl,webServiceUrlTls,pulsarServiceUrl,pulsarServiceUrlTls," +
- "persistentTopicsEnabled,nonPersistentTopicsEnabled,brokerVersionString,loadReportType,maxResourceUsage) " +
- "VALUES(#{brokerId}, #{broker},#{webServiceUrl},#{webServiceUrlTls},#{pulsarServiceUrl}," +
- "#{pulsarServiceUrlTls},#{persistentTopicsEnabled},#{nonPersistentTopicsEnabled}," +
- "#{brokerVersionString},#{loadReportType},#{maxResourceUsage})")
- void insert(BrokerEntity brokersEntity);
-
- @Select("SELECT broker,webServiceUrl,webServiceUrlTls,pulsarServiceUrl,pulsarServiceUrlTls," +
- "persistentTopicsEnabled,nonPersistentTopicsEnabled,brokerVersionString,loadReportType,maxResourceUsage " +
- "FROM brokers WHERE brokerId=#{brokerId}")
- BrokerEntity findById(long brokerId);
-
- @Select("SELECT brokerId,broker,webServiceUrl,webServiceUrlTls,pulsarServiceUrl,pulsarServiceUrlTls," +
- "persistentTopicsEnabled,nonPersistentTopicsEnabled,brokerVersionString,loadReportType,maxResourceUsage " +
- "FROM brokers WHERE broker=#{broker}")
- BrokerEntity findByBroker(String broker);
-
- @Select("SELECT brokerId,broker,webServiceUrl,webServiceUrlTls,pulsarServiceUrl,pulsarServiceUrlTls," +
- "persistentTopicsEnabled,nonPersistentTopicsEnabled,brokerVersionString,loadReportType,maxResourceUsage " +
- "FROM brokers")
- Page<BrokerEntity> getBrokersList();
-
- @Update("UPDATE brokers set webServiceUrl=#{webServiceUrl},webServiceUrlTls=#{webServiceUrlTls}," +
- "pulsarServiceUrl=#{pulsarServiceUrl},pulsarServiceUrlTls=#{pulsarServiceUrlTls}," +
- "persistentTopicsEnabled=#{persistentTopicsEnabled}," +
- "nonPersistentTopicsEnabled=#{nonPersistentTopicsEnabled},brokerVersionString=#{brokerVersionString}," +
- "loadReportType=#{loadReportType},maxResourceUsage=#{maxResourceUsage} WHERE broker=#{broker}")
- void update(BrokerEntity brokersEntity);
-
- @Delete("DELETE FROM brokers WHERE brokerId=#{brokerId}")
- void deleteByBrokerId(Integer brokerId);
-
- @Delete("DELETE FROM brokers WHERE broker=#{broker}")
- void deleteByBroker(String broker);
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/BundlesMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/BundlesMapper.java
deleted file mode 100644
index 386e580..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/BundlesMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.mapper;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.entity.BundleEntity;
-import org.apache.ibatis.annotations.*;
-
-/**
- * Insert, delete, udpate and query for Bundle.
- */
-@Mapper
-public interface BundlesMapper {
-
- @Insert("INSERT INTO bundles(broker,tenant,namespace,bundle) VALUES(#{broker},#{tenant},#{namespace},#{bundle})")
- void insert(BundleEntity bundlesEntity);
-
- @Select("SELECT broker,tenant,namespace,bundle FROM bundles WHERE broker=#{btnb} or " +
- "tenant=#{btnb} or namespace=#{btnb} or bundle=#{btnb}")
- Page<BundleEntity> findByBrokerOrTenantOrNamespaceOrBundle(String btnb);
-
- @Select("SELECT broker,tenant,namespace,bundle FROM bundles WHERE bundle=#{bundle}")
- Page<BundleEntity> findByBundle(String bundle);
-
- @Select("SELECT broker,tenant,namespace,bundle FROM bundles")
- Page<BundleEntity> getBundlesList();
-
- @Delete("DELETE FROM bundles WHERE broker=#{broker} and tenant=#{tenant} " +
- "and namespace=#{namespace} and bundle=#{bundle}")
- void delete(@Param("broker") String broker, @Param("tenant") String tenant,
- @Param("namespace") String namespace, @Param("bundle") String bundle);
-
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/ClustersMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/ClustersMapper.java
deleted file mode 100644
index b38f904..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/ClustersMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.mapper;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.entity.ClusterEntity;
-import org.apache.ibatis.annotations.*;
-
-@Mapper
-public interface ClustersMapper {
-
- @Insert("INSERT INTO clusters(clusterId,cluster,serviceUrl,serviceUrlTls,brokerServiceUrl,brokerServiceUrlTls," +
- "peerClusterNames) VALUES(#{clusterId},#{cluster},#{serviceUrl},#{serviceUrlTls},#{brokerServiceUrl}," +
- "#{brokerServiceUrlTls},#{peerClusterNames})")
- void insert(ClusterEntity clustersEntity);
-
- @Select("SELECT clusterId,cluster,serviceUrl,serviceUrlTls,brokerServiceUrl,brokerServiceUrlTls,peerClusterNames " +
- "FROM clusters WHERE clusterId = #{clusterId}")
- ClusterEntity findById(long clusterId);
-
- @Select("SELECT clusterId,cluster,serviceUrl,serviceUrlTls,brokerServiceUrl,brokerServiceUrlTls,peerClusterNames " +
- "FROM clusters WHERE cluster = #{cluster}")
- ClusterEntity findByCluster(String cluster);
-
- @Select("SELECT clusterId,cluster,serviceUrl,serviceUrlTls,brokerServiceUrl,brokerServiceUrlTls,peerClusterNames " +
- "FROM clusters")
- Page<ClusterEntity> getClustersList();
-
-
- @Delete("DELETE FROM clusters WHERE clusterId = #{clusterId}")
- void deleteByClusterId(Integer clusterId);
-
- @Update("UPDATE clusters set serviceUrl=#{serviceUrl},serviceUrlTls=#{serviceUrlTls}," +
- "brokerServiceUrl=#{brokerServiceUrl},brokerServiceUrlTls=#{brokerServiceUrlTls}," +
- "peerClusterNames={peerClusterNames} FROM clusters where cluster=#{cluster")
- void update(ClusterEntity clustersEntity);
-
- @Delete("DELETE FROM clusters WHERE cluster=#{cluster}")
- void delete(String cluster);
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/NamespacesMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/NamespacesMapper.java
deleted file mode 100644
index 6c1dcf8..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/NamespacesMapper.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.mapper;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.entity.NamespaceEntity;
-import org.apache.ibatis.annotations.Delete;
-import org.apache.ibatis.annotations.Insert;
-import org.apache.ibatis.annotations.Mapper;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
-
-@Mapper
-public interface NamespacesMapper {
-
- @Insert("INSERT INTO namespaces(namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters," +
- "numBundles,boundaries,topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate," +
- "clusterSubscribeRate,bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum," +
- "managedLedgerMaxMarkDeleteRate,deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds," +
- "retentionTimeInMinutes,retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired," +
- "subscriptionAuthMode,maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription," +
- "compactionThreshold,offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy) VALUES(#{namespaceId},#{tenant},#{namespace},#{authPolicies}," +
- "#{backlogQuota},#{replicationClusters},#{numBundles}," +
- "#{boundaries},#{topicDispatchRate},#{subscriptionDispatchRate},#{replicatorDispatchRate}," +
- "#{clusterSubscribeRate},#{bookkeeperEnsemble},#{bookkeeperWriteQuorum},#{bookkeeperAckQuorum}," +
- "#{managedLedgerMaxMarkDeleteRate},#{deduplicationEnabled},#{latencyStatsSampleRate}," +
- "#{messageTtlInSeconds},#{retentionTimeInMinutes},#{retentionSizeInMB},#{deleted}," +
- "#{antiAffinityGroup},#{encryptionRequired},#{subscriptionAuthMode},#{maxProducersPerTopic}," +
- "#{maxConsumersPerTopic},#{maxConsumersPerSubscription},#{compactionThreshold},#{offloadThreshold}," +
- "#{offloadDeletionLagMs},#{schemaValidationEnforced},#{schemaAutoApdateCompatibilityStrategy})")
- void insert(NamespaceEntity namespacesEntity);
-
- @Select("SELECT namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters,numBundles,boundaries," +
- "topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate,clusterSubscribeRate," +
- "bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate," +
- "deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds,retentionTimeInMinutes," +
- "retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired,subscriptionAuthMode," +
- "maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription,compactionThreshold," +
- "offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy FROM namespaces WHERE namespaceId = #{namespaceId}")
- NamespaceEntity findById(long namespaceId);
-
- @Select("SELECT namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters,numBundles,boundaries," +
- "topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate,clusterSubscribeRate," +
- "bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate," +
- "deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds,retentionTimeInMinutes," +
- "retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired,subscriptionAuthMode," +
- "maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription,compactionThreshold," +
- "offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy FROM namespaces WHERE tenant=#{tenant} and namespace=#{namespace}")
- NamespaceEntity findByTenantNamespace(String tenant, String namespace);
-
- @Select("SELECT namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters,numBundles,boundaries," +
- "topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate,clusterSubscribeRate," +
- "bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate," +
- "deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds,retentionTimeInMinutes," +
- "retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired,subscriptionAuthMode," +
- "maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription,compactionThreshold," +
- "offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy FROM namespaces " +
- "WHERE tenant=#{tenantOrNamespace} or namespace=#{tenantOrNamespace}")
- Page<NamespaceEntity> findByTenantOrNamespace(String tenantOrNamespace);
-
- @Select("SELECT namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters,numBundles,boundaries," +
- "topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate,clusterSubscribeRate," +
- "bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate," +
- "deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds,retentionTimeInMinutes," +
- "retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired,subscriptionAuthMode," +
- "maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription,compactionThreshold," +
- "offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy FROM namespaces WHERE namespace=#{namespace}")
- Page<NamespaceEntity> findByNamespace(String namespace);
-
- @Select("SELECT namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters,numBundles,boundaries," +
- "topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate,clusterSubscribeRate," +
- "bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate," +
- "deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds,retentionTimeInMinutes," +
- "retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired,subscriptionAuthMode," +
- "maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription,compactionThreshold," +
- "offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy FROM namespaces WHERE tenant=#{tenant}")
- Page<NamespaceEntity> findByTenant(String tenant);
-
-
- @Select("SELECT namespaceId,tenant,namespace,authPolicies,backlogQuota,replicationClusters,numBundles,boundaries," +
- "topicDispatchRate,subscriptionDispatchRate,replicatorDispatchRate,clusterSubscribeRate," +
- "bookkeeperEnsemble,bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate," +
- "deduplicationEnabled,latencyStatsSampleRate,messageTtlInSeconds,retentionTimeInMinutes," +
- "retentionSizeInMB,deleted,antiAffinityGroup,encryptionRequired,subscriptionAuthMode," +
- "maxProducersPerTopic,maxConsumersPerTopic,maxConsumersPerSubscription,compactionThreshold," +
- "offloadThreshold,offloadDeletionLagMs,schemaValidationEnforced," +
- "schemaAutoApdateCompatibilityStrategy FROM namespaces")
- Page<NamespaceEntity> getNamespacesList();
-
- @Delete("DELETE FROM namespaces WHERE namespaceId = #{namespaceId}")
- void deleteByNamespaceId(long namespaceId);
-
- @Update("UPDATE namespaces set authPolicies=#{authPolicies},backlogQuota=#{backlogQuota}," +
- "replicationClusters=#{replicationClusters}," +
- "numBundles=#{numBundles},boundaries=#{boundaries},topicDispatchRate=#{topicDispatchRate}," +
- "subscriptionDispatchRate=#{subscriptionDispatchRate},replicatorDispatchRate=#{replicatorDispatchRate}," +
- "clusterSubscribeRate=#{clusterSubscribeRate},bookkeeperEnsemble=#{bookkeeperEnsemble}," +
- "bookkeeperWriteQuorum=#{bookkeeperWriteQuorum},bookkeeperAckQuorum=#{bookkeeperAckQuorum}," +
- "managedLedgerMaxMarkDeleteRate=#{managedLedgerMaxMarkDeleteRate}," +
- "deduplicationEnabled=#{deduplicationEnabled},latencyStatsSampleRate=#{latencyStatsSampleRate}," +
- "messageTtlInSeconds=#{messageTtlInSeconds},retentionTimeInMinutes=#{retentionTimeInMinutes}," +
- "retentionSizeInMB=#{retentionSizeInMB},deleted=#{deleted},antiAffinityGroup=#{antiAffinityGroup}," +
- "encryptionRequired=#{encryptionRequired},subscriptionAuthMode=#{subscriptionAuthMode}," +
- "maxProducersPerTopic=#{maxProducersPerTopic},maxConsumersPerTopic=#{maxConsumersPerTopic}," +
- "maxConsumersPerSubscription=#{maxConsumersPerSubscription},compactionThreshold=#{compactionThreshold}," +
- "offloadThreshold=#{offloadThreshold},offloadDeletionLagMs=#{offloadDeletionLagMs}," +
- "schemaValidationEnforced=#{schemaValidationEnforced}," +
- "schemaAutoApdateCompatibilityStrategy=#{schemaAutoApdateCompatibilityStrategy} " +
- "where tenant = #{tenant} and namespace = #{namespace})")
- void updateByTenantNamespace(NamespaceEntity namespacesEntity);
-
- @Delete("DELETE FROM namespaces WHERE tenant = #{tenant} and namespace = #{namespace}")
- void deleteByTenantNamespace(@Param("tenant") String tenant, @Param("namespace") String namespace);
-}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/TenantsMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/TenantsMapper.java
deleted file mode 100644
index 441fa4b..0000000
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/TenantsMapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.mapper;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.entity.TenantEntity;
-import org.apache.ibatis.annotations.Delete;
-import org.apache.ibatis.annotations.Insert;
-import org.apache.ibatis.annotations.Mapper;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
-
-@Mapper
-public interface TenantsMapper {
-
- @Insert("INSERT INTO tenants(tenant,tenantId,adminRoles,allowedClusters) " +
- "VALUES(#{tenant},#{tenantId},#{adminRoles},#{allowedClusters})")
- void insert(TenantEntity tenantsEntity);
-
- @Select("SELECT tenant,adminRoles,allowedClusters FROM tenants WHERE tenantId = #{tenantId}")
- TenantEntity findById(long tenantId);
-
- @Select("SELECT tenantId,tenant,adminRoles,allowedClusters FROM tenants WHERE tenant = #{tenant}")
- TenantEntity findByName(String tenant);
-
- @Select("SELECT tenantId,tenant,adminRoles,allowedClusters FROM tenants")
- Page<TenantEntity> getTenantsList();
-
- @Delete("DELETE FROM tenants WHERE tenantId = #{tenantId}")
- void delete(Long tenantId);
-
- @Delete("DELETE FROM tenants WHERE tenant = #{tenant}")
- void deleteByTenant(String tenant);
-
- @Update("UPDATE tenants set adminRoles = #{adminRoles}, allowedClusters = #{allowedClusters} where tenant = #{tenant}")
- void updateByTenant(String tenant);
-}
diff --git a/src/main/resources/META-INF/sql/data.sql b/src/main/resources/META-INF/sql/data.sql
deleted file mode 100644
index 923ae40..0000000
--- a/src/main/resources/META-INF/sql/data.sql
+++ /dev/null
@@ -1,25 +0,0 @@
---
--- Licensed under the Apache License, Version 2.0 (the "License");
--- you may not use this file except in compliance with the License.
--- You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-INSERT INTO tenants (tenant) VALUES ('Paul1');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul2', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul3', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul4', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul5', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul6', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul7', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul8', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul9', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul10', 'role1,role2', 'standalone');
-INSERT INTO tenants (tenant, adminRoles, allowedClusters) VALUES ('Paul11', 'role1,role2', 'standalone');
\ No newline at end of file
diff --git a/src/main/resources/META-INF/sql/mysql-schema.sql b/src/main/resources/META-INF/sql/mysql-schema.sql
index a4d12e6..023941d 100644
--- a/src/main/resources/META-INF/sql/mysql-schema.sql
+++ b/src/main/resources/META-INF/sql/mysql-schema.sql
@@ -16,93 +16,6 @@
USE pulsar_manager;
-CREATE TABLE IF NOT EXISTS tenants (
- tenant varchar(255) NOT NULL PRIMARY KEY,
- tenantId BIGINT NOT NULL,
- adminRoles TEXT,
- allowedClusters TEXT,
- UNIQUE (tenantId)
-) ENGINE=InnoDB CHARACTER SET utf8;
-
-
-CREATE TABLE IF NOT EXISTS namespaces (
- namespaceId BIGINT NOT NULL,
- tenant varchar(255) NOT NULL,
- namespace varchar(255) NOT NULL,
- authPolicies TEXT,
- backlogQuota TEXT,
- replicationClusters TEXT,
- numBundles BIGINT,
- boundaries TEXT,
- topicDispatchRate TEXT,
- subscriptionDispatchRate TEXT,
- replicatorDispatchRate TEXT,
- clusterSubscribeRate TEXT,
- bookkeeperEnsemble BIGINT,
- bookkeeperWriteQuorum BIGINT,
- bookkeeperAckQuorum BIGINT,
- managedLedgerMaxMarkDeleteRate double,
- deduplicationEnabled BOOLEAN,
- latencyStatsSampleRate TEXT,
- messageTtlInSeconds BIGINT,
- retentionTimeInMinutes BIGINT,
- retentionSizeInMB BIGINT,
- deleted BOOLEAN,
- antiAffinityGroup VARCHAR(255),
- encryptionRequired BOOLEAN,
- subscriptionAuthMode VARCHAR(12),
- maxProducersPerTopic BIGINT,
- maxConsumersPerTopic BIGINT,
- maxConsumersPerSubscription BIGINT,
- compactionThreshold BIGINT,
- offloadThreshold BIGINT,
- offloadDeletionLagMs BIGINT,
- schemaValidationEnforced BOOLEAN,
- schemaAutoApdateCompatibilityStrategy VARCHAR(36),
- CONSTRAINT FK_tenant FOREIGN KEY (tenant) References tenants(tenant),
- CONSTRAINT PK_namespace PRIMARY KEY (tenant, namespace),
- UNIQUE (namespaceId)
-) ENGINE=InnoDB CHARACTER SET utf8;
--- ALTER table namespaces ADD INDEX namespaces_namespace_index(namespace);
-
-CREATE TABLE IF NOT EXISTS clusters (
- cluster varchar(255) NOT NULL PRIMARY KEY,
- clusterId BIGINT NOT NULL,
- serviceUrl varchar(1024),
- serviceUrlTls varchar(1024),
- brokerServiceUrl varchar(1024),
- brokerServiceUrlTls varchar(1024),
- peerClusterNames varchar(1024),
- UNIQUE (clusterId)
-) ENGINE=InnoDB CHARACTER SET utf8;
--- ALTER table clusters ADD INDEX clusters_cluster_index(cluster);
-
-CREATE TABLE IF NOT EXISTS brokers (
- broker varchar(1024) NOT NULL PRIMARY KEY,
- brokerId BIGINT NOT NULl,
- webServiceUrl varchar(1024),
- webServiceUrlTls varchar(1024),
- pulsarServiceUrl varchar(1024),
- pulsarServiceUrlTls varchar(1024),
- persistentTopicsEnabled BOOLEAN,
- nonPersistentTopicsEnabled BOOLEAN,
- brokerVersionString varchar(36),
- loadReportType varchar(36),
- maxResourceUsage double,
- UNIQUE (brokerId)
-) ENGINE=InnoDB CHARACTER SET utf8;
-
--- CREATE TABLE IF NOT EXISTS bundles (
--- broker varchar(1024) NOT NULL,
--- tenant varchar(255) NOT NULL,
--- namespace varchar(255) NOT NULL,
--- bundle varchar(1024) NOT NULL,
--- CONSTRAINT FK_broker FOREIGN KEY (broker) References brokers(broker),
--- CONSTRAINT FK_tenant FOREIGN KEY (tenant) References tenants(tenant),
--- CONSTRAINT FK_namespace FOREIGN KEY (namespace) References namespaces(namespace),
--- CONSTRAINT PK_bundle PRIMARY KEY (broker, tenant, namespace, bundle)
--- ) ENGINE=InnoDB CHARACTER SET utf8;
-
CREATE TABLE IF NOT EXISTS environments (
name varchar(256) NOT NULL,
broker varchar(1024) NOT NULL,
diff --git a/src/main/resources/META-INF/sql/sqlite-schema.sql b/src/main/resources/META-INF/sql/sqlite-schema.sql
index a68440e..dfc8c7c 100644
--- a/src/main/resources/META-INF/sql/sqlite-schema.sql
+++ b/src/main/resources/META-INF/sql/sqlite-schema.sql
@@ -12,94 +12,6 @@
-- limitations under the License.
--
-CREATE TABLE IF NOT EXISTS tenants (
- tenant varchar(255) NOT NULL PRIMARY KEY,
- tenantId INTEGER NOT NULL,
- adminRoles TEXT,
- allowedClusters TEXT,
- UNIQUE (tenantId)
-);
-
-
-CREATE TABLE IF NOT EXISTS namespaces (
- namespaceId INTEGER NOT NULL,
- tenant varchar(255) NOT NULL,
- namespace varchar(255) NOT NULL,
- authPolicies TEXT,
- backlogQuota TEXT,
- replicationClusters TEXT,
- numBundles INTEGER,
- boundaries TEXT,
- topicDispatchRate TEXT,
- subscriptionDispatchRate TEXT,
- replicatorDispatchRate TEXT,
- clusterSubscribeRate TEXT,
- bookkeeperEnsemble INTEGER,
- bookkeeperWriteQuorum INTEGER,
- bookkeeperAckQuorum INTEGER,
- managedLedgerMaxMarkDeleteRate double,
- deduplicationEnabled false,
- latencyStatsSampleRate TEXT,
- messageTtlInSeconds INTEGER,
- retentionTimeInMinutes INTEGER,
--- mysql use bigint
- retentionSizeInMB INTEGER,
- deleted false,
- antiAffinityGroup VARCHAR(255),
- encryptionRequired false,
- subscriptionAuthMode VARCHAR(12),
- maxProducersPerTopic INTEGER,
- maxConsumersPerTopic INTEGER,
- maxConsumersPerSubscription INTEGER,
- compactionThreshold INTEGER,
- offloadThreshold INTEGER,
- offloadDeletionLagMs INTEGER,
- schemaValidationEnforced false,
- schemaAutoApdateCompatibilityStrategy VARCHAR(36),
- CONSTRAINT FK_tenant FOREIGN KEY (tenant) References tenants(tenant),
- CONSTRAINT PK_namespace PRIMARY KEY (tenant, namespace)
- UNIQUE (namespaceId)
-);
-CREATE INDEX IF NOT EXISTS namespaces_namespace_index ON namespaces(namespace);
-
-CREATE TABLE IF NOT EXISTS clusters (
- cluster varchar(255) NOT NULL PRIMARY KEY,
- clusterId INTEGER NOT NULL,
- serviceUrl varchar(1024),
- serviceUrlTls varchar(1024),
- brokerServiceUrl varchar(1024),
- brokerServiceUrlTls varchar(1024),
- peerClusterNames varchar(1024),
- UNIQUE (clusterId)
-);
-CREATE INDEX IF NOT EXISTS clusters_cluster_index ON clusters(cluster);
-
-CREATE TABLE IF NOT EXISTS brokers (
- broker varchar(1024) NOT NULL PRIMARY KEY,
- brokerId INTEGER NOT NULl,
- webServiceUrl varchar(1024),
- webServiceUrlTls varchar(1024),
- pulsarServiceUrl varchar(1024),
- pulsarServiceUrlTls varchar(1024),
- persistentTopicsEnabled true,
- nonPersistentTopicsEnabled true,
- brokerVersionString varchar(36),
- loadReportType varchar(36),
- maxResourceUsage double,
- UNIQUE (brokerId)
-);
-
-CREATE TABLE IF NOT EXISTS bundles (
- broker varchar(1024) NOT NULL,
- tenant varchar(255) NOT NULL,
- namespace varchar(255) NOT NULL,
- bundle varchar(1024) NOT NULL,
- CONSTRAINT FK_broker FOREIGN KEY (broker) References brokers(broker),
- CONSTRAINT FK_tenant FOREIGN KEY (tenant) References tenants(tenant),
- CONSTRAINT FK_namespace FOREIGN KEY (namespace) References namespaces(namespace),
- CONSTRAINT PK_bundle PRIMARY KEY (broker, tenant, namespace, bundle)
-);
-
CREATE TABLE IF NOT EXISTS environments (
name varchar(256) NOT NULL,
broker varchar(1024) NOT NULL,
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 2b0ccf2..1141d6f 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -15,14 +15,13 @@
spring.cloud.refresh.refreshable=none
server.port=7750
-mybatis.type-aliases-package=com.manager.pulsar
+mybatis.type-aliases-package=io.streamnative.pulsar.manager
# database connection
spring.datasource.driver-class-name=org.sqlite.JDBC
spring.datasource.url=jdbc:sqlite:pulsar_manager.db
spring.datasource.initialization-mode=always
spring.datasource.schema=classpath:/META-INF/sql/sqlite-schema.sql
-#spring.datasource.data=classpath:/META-INF/sql/data.sql
spring.datasource.username=
spring.datasource.password=
spring.datasource.max-idle=10
@@ -41,24 +40,6 @@
# DEBUG print execute sql
logging.level.com.manager.=DEBUG
-# apache pulsar configuration
-pulsar.client.serviceUrl=pulsar://localhost:6650
-# pulsar.client.operationTimeout=
-# pulsar.client.ioThreads=
-# pulsar.client.listenerThreads=
-# pulsar.client.connectionsPerBroker=
-# pulsar.client.enableTcpNoDelay=
-# pulsar.client.tlsTrustCertsFilePath=
-# pulsar.client.allowTlsInsecureConnection=
-# pulsar.client.enableTlsHostnameVerification=
-# pulsar.client.statsInterval=
-# pulsar.client.maxConcurrentLookupRequests=
-# pulsar.client.maxLookupRequests=
-# pulsar.client.maxNumberOfRejectedRequestPerConnection=
-# pulsar.client.keepAliveInterval=
-# pulsar.client.connectionTimeout=
-# pulsar.client.startingBackoffInterval=
-# pulsar.client.maxBackoffInterval=
backend.directRequestBroker=true
backend.directRequestHost=http://localhost:8080
@@ -81,4 +62,4 @@
init.delay.interval=0
# cluster data reload
-cluster.cache.reload.interval.ms=60000
+cluster.cache.reload.interval.ms=60000
diff --git a/src/test/java/io/streamnative/pulsar/manager/client/ClientTest.java b/src/test/java/io/streamnative/pulsar/manager/client/ClientTest.java
deleted file mode 100644
index 51f8f5c..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/client/ClientTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client;
-
-import io.streamnative.pulsar.manager.client.config.ClientConfigurationData;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Pulsar Client Test.
- */
-public class ClientTest {
-
- @Test
- public void testClientInitConfig() {
- ClientBuilder clientBuilder = mock(ClientBuilder.class);
- ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
- Client client = new Client(clientConfigurationData);
-
- clientConfigurationData.setServiceUrl(null);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Serviceurl is incorrect", e.getMessage());
- }
-
- clientConfigurationData.setServiceUrl("http://xxx");
-
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Serviceurl is incorrect", e.getMessage());
- }
-
- clientConfigurationData.setServiceUrl("pulsar://localhost:6650");
- when(clientBuilder.serviceUrl(clientConfigurationData.getServiceUrl())).thenReturn(clientBuilder);
-
- clientConfigurationData.setOperationTimeout(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter operationTimeout should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setOperationTimeout(10);
- when(clientBuilder.operationTimeout(10, TimeUnit.MILLISECONDS)).thenReturn(clientBuilder);
-
- clientConfigurationData.setIoThreads(-10);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter ioThreads should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setIoThreads(10);
- when(clientBuilder.ioThreads(10)).thenReturn(clientBuilder);
-
- clientConfigurationData.setListenerThreads(-10);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter listenerThreads should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setListenerThreads(10);
- when(clientBuilder.listenerThreads(10)).thenReturn(clientBuilder);
-
- clientConfigurationData.setConnectionsPerBroker(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter connectionsPerBroker should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setConnectionsPerBroker(10);
- when(clientBuilder.connectionsPerBroker(10)).thenReturn(clientBuilder);
-
- clientConfigurationData.setTlsTrustCertsFilePath("");
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter tlsTrustCertsFilePath should be set", e.getMessage());
- }
-
- clientConfigurationData.setTlsTrustCertsFilePath("/etc/tlspath/");
- when(clientBuilder.tlsTrustCertsFilePath("/etc/tlspath/")).thenReturn(clientBuilder);
-
- clientConfigurationData.setStatsInterval(-1L);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter statsInterval should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setStatsInterval(10L);
- when(clientBuilder.statsInterval(10L, TimeUnit.SECONDS)).thenReturn(clientBuilder);
-
- clientConfigurationData.setMaxConcurrentLookupRequests(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter maxConcurrentLookupRequests should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setMaxConcurrentLookupRequests(10);
- when(clientBuilder.maxConcurrentLookupRequests(10)).thenReturn(clientBuilder);
-
- clientConfigurationData.setMaxLookupRequests(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter maxLookupRequests should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setMaxLookupRequests(10);
- when(clientBuilder.maxLookupRequests(10)).thenReturn(clientBuilder);
-
- clientConfigurationData.setMaxNumberOfRejectedRequestPerConnection(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter maxNumberOfRejectedRequestPerConnection should be greater than 0",
- e.getMessage());
- }
-
- clientConfigurationData.setMaxNumberOfRejectedRequestPerConnection(10);
- when(clientBuilder.maxNumberOfRejectedRequestPerConnection(10)).thenReturn(clientBuilder);
-
- clientConfigurationData.setKeepAliveInterval(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter keepAliveInterval should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setKeepAliveInterval(10);
- when(clientBuilder.keepAliveInterval(10, TimeUnit.SECONDS)).thenReturn(clientBuilder);
-
- clientConfigurationData.setConnectionTimeout(-1);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter connectionTimeout should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setConnectionTimeout(10);
- when(clientBuilder.connectionTimeout(10, TimeUnit.MILLISECONDS)).thenReturn(clientBuilder);
-
- clientConfigurationData.setStartingBackoffInterval(-1L);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter startingBackoffInterval should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setStartingBackoffInterval(10L);
- when(clientBuilder.startingBackoffInterval(10L, TimeUnit.MILLISECONDS)).thenReturn(clientBuilder);
-
- clientConfigurationData.setMaxBackoffInterval(-1L);
- try {
- client.checkAndInitClientConfig(clientBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter maxBackoffInterval should be greater than 0", e.getMessage());
- }
-
- clientConfigurationData.setMaxBackoffInterval(10L);
- when(clientBuilder.maxBackoffInterval(10L, TimeUnit.MILLISECONDS)).thenReturn(clientBuilder);
-
-
- clientConfigurationData.setEnableTcpNoDelay(false);
- when(clientBuilder.enableTcpNoDelay(false)).thenReturn(clientBuilder);
-
- clientConfigurationData.setAllowTlsInsecureConnection(false);
- when(clientBuilder.allowTlsInsecureConnection(false)).thenReturn(clientBuilder);
-
- clientConfigurationData.setEnableTlsHostnameVerification(false);
- when(clientBuilder.enableTlsHostnameVerification(false)).thenReturn(clientBuilder);
-
- client.checkAndInitClientConfig(clientBuilder);
-
- // start check
- verify(clientBuilder, atLeast(1)).enableTlsHostnameVerification(false);
- verify(clientBuilder, atLeast(1)).allowTlsInsecureConnection(false);
- verify(clientBuilder, atLeast(1)).enableTcpNoDelay(false);
- verify(clientBuilder, atLeast(1)).maxBackoffInterval(10L, TimeUnit.MILLISECONDS);
- verify(clientBuilder, atLeast(1)).startingBackoffInterval(10L, TimeUnit.MILLISECONDS);
- verify(clientBuilder, atLeast(1)).connectionTimeout(10, TimeUnit.MILLISECONDS);
- verify(clientBuilder, atLeast(1)).keepAliveInterval(10, TimeUnit.SECONDS);
- verify(clientBuilder, atLeast(1)).maxNumberOfRejectedRequestPerConnection(10);
- verify(clientBuilder, atLeast(1)).maxLookupRequests(10);
- verify(clientBuilder, atLeast(1)).maxConcurrentLookupRequests(10);
- verify(clientBuilder, atLeast(1)).statsInterval(10, TimeUnit.SECONDS);
- verify(clientBuilder, atLeast(1)).tlsTrustCertsFilePath("/etc/tlspath/");
- verify(clientBuilder, atLeast(1)).connectionsPerBroker(10);
- verify(clientBuilder, atLeast(1)).ioThreads(10);
- verify(clientBuilder, atLeast(1)).operationTimeout(10, TimeUnit.MILLISECONDS);
- verify(clientBuilder, atLeast(1)).serviceUrl(clientConfigurationData.getServiceUrl());
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerContainerTest.java b/src/test/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerContainerTest.java
deleted file mode 100644
index d1caeee..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerContainerTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.consumer;
-
-import io.streamnative.pulsar.manager.client.Client;
-import io.streamnative.pulsar.manager.client.PulsarApplicationListener;
-import io.streamnative.pulsar.manager.client.annotation.PulsarListener;
-import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
-import io.streamnative.pulsar.manager.client.config.PulsarConsumerConfigRegister;
-import io.streamnative.pulsar.manager.client.utils.ParseAnnotation;
-import io.streamnative.pulsar.manager.client.utils.TestMessage;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.schema.SchemaType;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Pulsar Consumer container test.
- */
-public class PulsarConsumerContainerTest {
-
- private final TestMessage testMessage = new TestMessage();
-
- private static final String receiveMessage = "hello-world";
-
- static class Foo {
- private String field1;
- private String field2;
- private int field3;
- }
-
- private class PulsarListenerAnnotationByte {
-
- @PulsarListener(topics = "test", subscriptionName = "xxxx")
- public void testReceive(Message message) {
- Assert.assertEquals(new String(message.getData()), receiveMessage);
- }
- }
-
- private class PulsarListenerAnnotationAvro {
-
- @PulsarListener(id = "test-container", topics = "test2",
- subscriptionName = "xxxx2", schema = Foo.class, schemaType = SchemaType.AVRO)
- public void testReceive(Message message) {
- Foo foo = (Foo) message.getValue();
- Assert.assertEquals(foo.field1, "a");
- Assert.assertEquals(foo.field2, "b");
- Assert.assertEquals(foo.field3, 4);
- }
- }
-
-
- @Test
- public void testPulsarConsumerContainer() throws PulsarClientException, InterruptedException {
- Client client = mock(Client.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
- Consumer consumer = mock(Consumer.class);
- ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
- PulsarApplicationListener pulsarApplicationListener = mock(PulsarApplicationListener.class);
- PulsarConsumerConfigRegister pulsarConsumerConfigRegister = new PulsarConsumerConfigRegister();
- PulsarListenerAnnotationByte pulsarListenerAnnotation = new PulsarListenerAnnotationByte();
- when(pulsarApplicationListener.getClient()).thenReturn(client);
- pulsarConsumerConfigRegister.setPulsarApplicationListener(pulsarApplicationListener);
- ParseAnnotation.parse(pulsarListenerAnnotation);
- for (ConsumerConfigurationData consumerConfigurationData : ParseAnnotation.CONSUMER_CONFIGURATION_DATA) {
- pulsarConsumerConfigRegister.setConsumerContainer(consumerConfigurationData);
- }
- when(client.getPulsarClient()).thenReturn(pulsarClient);
- when(pulsarClient.newConsumer(Schema.BYTES)).thenReturn(consumerBuilder);
- when(consumerBuilder.subscribe()).thenReturn(consumer);
- when(consumer.toString()).thenReturn("consumer");
- testMessage.setData(receiveMessage.getBytes());
- when(consumer.receive()).thenReturn(testMessage);
- pulsarConsumerConfigRegister.afterPropertiesSet();
- Thread.sleep(10);
- pulsarConsumerConfigRegister.stopAllContainers();
- verify(consumer, atLeast(1)).receive();
- verify(consumer, atLeast(1)).acknowledgeAsync(testMessage);
- }
-
- @Test
- public void testPulsarConsumerContainerAvro() throws PulsarClientException, InterruptedException {
- Client client = mock(Client.class);
- PulsarClient pulsarClient = mock(PulsarClient.class);
- Consumer consumer = mock(Consumer.class);
- ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
- PulsarApplicationListener pulsarApplicationListener = mock(PulsarApplicationListener.class);
- PulsarConsumerConfigRegister pulsarConsumerConfigRegister = new PulsarConsumerConfigRegister();
- PulsarListenerAnnotationAvro pulsarListenerAnnotation = new PulsarListenerAnnotationAvro();
- when(pulsarApplicationListener.getClient()).thenReturn(client);
- pulsarConsumerConfigRegister.setPulsarApplicationListener(pulsarApplicationListener);
- ParseAnnotation.parse(pulsarListenerAnnotation);
- for (ConsumerConfigurationData consumerConfigurationData : ParseAnnotation.CONSUMER_CONFIGURATION_DATA) {
- pulsarConsumerConfigRegister.setConsumerContainer(consumerConfigurationData);
- }
- when(client.getPulsarClient()).thenReturn(pulsarClient);
-
- Schema fooSchema = Schema.AVRO(Foo.class);
- PulsarConsumerContainer pulsarConsumerContainer = pulsarConsumerConfigRegister
- .getConsumerContainer("test-container");
- pulsarConsumerContainer.getPulsarConsumer().setSchema(fooSchema);
- when(pulsarClient.newConsumer(fooSchema)).thenReturn(consumerBuilder);
- when(consumerBuilder.subscribe()).thenReturn(consumer);
- when(consumer.toString()).thenReturn("consumer");
- Foo foo = new Foo();
- foo.field1 = "a";
- foo.field2 = "b";
- foo.field3 = 4;
- testMessage.setValue(foo);
- when(consumer.receive()).thenReturn(testMessage);
- pulsarConsumerConfigRegister.afterPropertiesSet();
- Thread.sleep(10);
- pulsarConsumerConfigRegister.stopAllContainers();
- verify(consumer, atLeast(1)).receive();
- verify(consumer, atLeast(1)).acknowledgeAsync(testMessage);
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerTest.java b/src/test/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerTest.java
deleted file mode 100644
index 4d18e11..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/client/consumer/PulsarConsumerTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.consumer;
-
-import io.streamnative.pulsar.manager.client.Client;
-import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
-import org.apache.pulsar.client.api.*;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Pulsar Consumer Test Class.
- */
-public class PulsarConsumerTest {
-
- @Test
- public void testPulsarConsumerInitConfig() {
- Client client = mock(Client.class);
- ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
-
- ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData();
- PulsarConsumer pulsarConsumer = new PulsarConsumer(client, consumerConfigurationData);
-
- when(consumerBuilder.subscriptionName(
- consumerConfigurationData.getSubscriptionName())).thenReturn(consumerBuilder);
-
- // test subscription name is null
- consumerConfigurationData.setSubscriptionName(null);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("The subscription name is incorrect", e.getMessage());
- }
- // test subscription name is ""
- consumerConfigurationData.setSubscriptionName("");
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("The subscription name is incorrect", e.getMessage());
- }
-
- consumerConfigurationData.setSubscriptionName("test-consumer");
-
- // test subscriptiontype is null
- consumerConfigurationData.setSubscriptionType(null);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("The subscription type should be set correctly." +
- "Exclusive, Failover, Shared and Key_Shared are currently supported.", e.getMessage());
- }
-
- consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive);
- when(consumerBuilder.subscriptionType(SubscriptionType.Exclusive)).thenReturn(consumerBuilder);
-
- String[] testNullTopics = {null};
- consumerConfigurationData.setTopics(testNullTopics);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Length of topic should be greater than 0", e.getMessage());
- }
-
- String[] testEmptyTopics = {""};
- consumerConfigurationData.setTopics(testEmptyTopics);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Length of topic should be greater than 0", e.getMessage());
- }
-
- String[] topics = {"test-topics"};
- consumerConfigurationData.setTopics(topics);
- when(consumerBuilder.topics(Arrays.asList(consumerConfigurationData.getTopics()))).thenReturn(consumerBuilder);
-
- consumerConfigurationData.setAckTimeout(-1L);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter ackTimeout cannot be less than 10s", e.getMessage());
- }
-
- consumerConfigurationData.setAckTimeout(0L);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter ackTimeout cannot be less than 10s", e.getMessage());
- }
-
- consumerConfigurationData.setAckTimeout(10L);
- when(consumerBuilder.ackTimeout(10L, TimeUnit.SECONDS)).thenReturn(consumerBuilder);
-
- consumerConfigurationData.setReceiverQueueSize(-1);
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter receiverQueueSize should be greater than 0", e.getMessage());
- }
-
- consumerConfigurationData.setReceiverQueueSize(1000);
- when(consumerBuilder.receiverQueueSize(1000)).thenReturn(consumerBuilder);
-
- consumerConfigurationData.setAcknowledgmentGroupTime(-1L);
-
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter acknowledgmentGroupTime cannot be less than 0", e.getMessage());
- }
-
- consumerConfigurationData.setAcknowledgmentGroupTime(1000L);
- when(consumerBuilder.acknowledgmentGroupTime(1000L, TimeUnit.MILLISECONDS)).thenReturn(consumerBuilder);
-
- consumerConfigurationData.setConsumerName("test-consumer");
- when(consumerBuilder.consumerName("test-consumer")).thenReturn(consumerBuilder);
-
- consumerConfigurationData.setNegativeAckRedeliveryDelay(-1L);
-
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter negativeAckRedeliveryDelay cannot be less than 0", e.getMessage());
- }
-
- consumerConfigurationData.setNegativeAckRedeliveryDelay(1000L);
- when(consumerBuilder.negativeAckRedeliveryDelay(1000L, TimeUnit.MILLISECONDS)).thenReturn(consumerBuilder);
-
- consumerConfigurationData.setPriorityLevel(-1);
-
- try {
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- } catch (IllegalArgumentException e) {
- Assert.assertEquals("Parameter priorityLevel cannot be less than 0", e.getMessage());
- }
- consumerConfigurationData.setPriorityLevel(0);
- when(consumerBuilder.priorityLevel(0)).thenReturn(consumerBuilder);
- consumerConfigurationData.setRegexSubscriptionMode(RegexSubscriptionMode.PersistentOnly);
- when(consumerBuilder.subscriptionTopicsMode(
- consumerConfigurationData.getRegexSubscriptionMode())).thenReturn(consumerBuilder);
-
- pulsarConsumer.initConsumerConfig(consumerBuilder);
- verify(consumerBuilder, atLeast(1))
- .subscriptionName(consumerConfigurationData.getSubscriptionName());
- verify(consumerBuilder, atLeast(1))
- .subscriptionType(consumerConfigurationData.getSubscriptionType());
- verify(consumerBuilder, atLeast(1)).ackTimeout(10L, TimeUnit.SECONDS);
- verify(consumerBuilder, atLeast(1))
- .topics(Arrays.asList(consumerConfigurationData.getTopics()));
- verify(consumerBuilder, atLeast(1)).receiverQueueSize(1000);
- verify(consumerBuilder, atLeast(1))
- .acknowledgmentGroupTime(1000L, TimeUnit.MILLISECONDS);
- verify(consumerBuilder, atLeast(1)).consumerName("test-consumer");
- verify(consumerBuilder, atLeast(1))
- .negativeAckRedeliveryDelay(1000L, TimeUnit.MILLISECONDS);
- verify(consumerBuilder, atLeast(1)).priorityLevel(0);
- verify(consumerBuilder, atLeast(1))
- .subscriptionTopicsMode(consumerConfigurationData.getRegexSubscriptionMode());
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/client/utils/ParseAnnotation.java b/src/test/java/io/streamnative/pulsar/manager/client/utils/ParseAnnotation.java
deleted file mode 100644
index f3fc705..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/client/utils/ParseAnnotation.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.utils;
-
-import io.streamnative.pulsar.manager.client.annotation.PulsarListener;
-import io.streamnative.pulsar.manager.client.config.ConsumerConfigurationData;
-import org.springframework.aop.support.AopUtils;
-import org.springframework.core.MethodIntrospector;
-import org.springframework.core.annotation.AnnotatedElementUtils;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A tool to test after parse custom annotation.
- */
-public class ParseAnnotation {
-
- public static final List<ConsumerConfigurationData> CONSUMER_CONFIGURATION_DATA = new ArrayList<>();
-
- public static void parse(final Object bean) {
- Class<?> targetClass = AopUtils.getTargetClass(bean);
- Map<Method, Set<PulsarListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
- (MethodIntrospector.MetadataLookup<Set<PulsarListener>>) method -> {
- Set<PulsarListener> listenerMethods = new HashSet<>();
- PulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(method, PulsarListener.class);
- if (ann != null) {
- listenerMethods.add(ann);
- }
- return (!listenerMethods.isEmpty() ? listenerMethods : null);
- });
- if (!annotatedMethods.isEmpty()) {
- for (Map.Entry<Method, Set<PulsarListener>> entry : annotatedMethods.entrySet()) {
- Method method = entry.getKey();
- for (PulsarListener listener : entry.getValue()) {
- ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData();
- consumerConfigurationData.setId(listener.id());
- consumerConfigurationData.setTopics(listener.topics());
- consumerConfigurationData.setAckTimeout(listener.ackTimeout());
- consumerConfigurationData.setSubscriptionName(listener.subscriptionName());
- consumerConfigurationData.setSubscriptionType(listener.subscriptionType());
- consumerConfigurationData.setAcknowledgmentGroupTime(listener.acknowledgmentGroupTime());
- consumerConfigurationData.setNegativeAckRedeliveryDelay(listener.negativeAckRedeliveryDelay());
- consumerConfigurationData.setReceiverQueueSize(listener.receiverQueueSize());
- consumerConfigurationData.setTopics(listener.topics());
- consumerConfigurationData.setAcknowledgmentGroupTime(listener.acknowledgmentGroupTime());
- consumerConfigurationData.setTopicsPattern(listener.topicsPattern());
- consumerConfigurationData.setAutoUpdatePartitions(listener.autoUpdatePartitions());
- consumerConfigurationData.setConsumerName(listener.consumerName());
- consumerConfigurationData.setMethod(method);
- consumerConfigurationData.setBean(bean);
- consumerConfigurationData.setRegexSubscriptionMode(listener.regexSubscriptionMode());
- consumerConfigurationData.setSchema(listener.schema());
- consumerConfigurationData.setSchemaType(listener.schemaType());
- CONSUMER_CONFIGURATION_DATA.add(consumerConfigurationData);
- }
- }
- }
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/client/utils/TestMessage.java b/src/test/java/io/streamnative/pulsar/manager/client/utils/TestMessage.java
deleted file mode 100644
index d607d8c..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/client/utils/TestMessage.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.client.utils;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.api.EncryptionContext;
-
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Implements interface Message for test.
- */
-public class TestMessage implements Message {
- private byte[] data;
-
- private Object value;
-
- private MessageId messageId;
-
- @Override
- public Map<String, String> getProperties() {
- return null;
- }
-
- @Override
- public boolean hasProperty(String s) {
- return false;
- }
-
- @Override
- public String getProperty(String s) {
- return null;
- }
-
- @Override
- public byte[] getData() {
- return data;
- }
-
- public void setData(byte[] data) {
- this.data = data;
- }
-
- @Override
- public Object getValue() {
- return value;
- }
-
- public void setValue(Object value) {
- this.value = value;
- }
-
- @Override
- public MessageId getMessageId() {
- return messageId;
- }
-
- public void setMessageId(MessageId messageId) {
- this.messageId = messageId;
- }
-
- @Override
- public long getPublishTime() {
- return 0;
- }
-
- @Override
- public long getEventTime() {
- return 0;
- }
-
- @Override
- public long getSequenceId() {
- return 0;
- }
-
- @Override
- public String getProducerName() {
- return null;
- }
-
- @Override
- public boolean hasKey() {
- return false;
- }
-
- @Override
- public String getKey() {
- return null;
- }
-
- @Override
- public boolean hasBase64EncodedKey() {
- return false;
- }
-
- @Override
- public byte[] getKeyBytes() {
- return new byte[0];
- }
-
- @Override
- public boolean hasOrderingKey() {
- return false;
- }
-
- @Override
- public byte[] getOrderingKey() {
- return new byte[0];
- }
-
- @Override
- public String getTopicName() {
- return null;
- }
-
- @Override
- public Optional<EncryptionContext> getEncryptionCtx() {
- return Optional.empty();
- }
-
- @Override
- public int getRedeliveryCount() {
- return 0;
- }
-
- @Override
- public byte[] getSchemaVersion() {
- return new byte[0];
- }
-
- @Override
- public boolean isReplicated() {
- return false;
- }
-
- @Override
- public String getReplicatedFrom() {
- return null;
- }
-}
\ No newline at end of file
diff --git a/src/test/java/io/streamnative/pulsar/manager/dao/BrokersRepositoryImplTest.java b/src/test/java/io/streamnative/pulsar/manager/dao/BrokersRepositoryImplTest.java
deleted file mode 100644
index ae6bdae..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/dao/BrokersRepositoryImplTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.PulsarManagerApplication;
-import io.streamnative.pulsar.manager.entity.BrokerEntity;
-import io.streamnative.pulsar.manager.entity.BrokersRepository;
-import io.streamnative.pulsar.manager.profiles.SqliteDBTestProfile;
-import java.util.Optional;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-/**
- * Brokers crud test.
- */
-@RunWith(SpringRunner.class)
-@SpringBootTest(
- classes = {
- PulsarManagerApplication.class,
- SqliteDBTestProfile.class
- }
-)
-@ActiveProfiles("test")
-public class BrokersRepositoryImplTest {
-
- @Autowired
- private BrokersRepository brokersRepository;
-
- private void initBrokerEntity(BrokerEntity brokersEntity) {
- brokersEntity.setBrokerId(1);
- brokersEntity.setBroker("test-broker");
- brokersEntity.setWebServiceUrl("http://tengdeMBP:8080");
- brokersEntity.setWebServiceUrlTls("https://tengdeMBP:8080");
- brokersEntity.setPulsarServiceUrl("pulsar://tengdeMBP:6650");
- brokersEntity.setPulsarServiceUrlTls("pulsar+ssl://tengdeMBP:6650");
- brokersEntity.setPersistentTopicsEnabled(true);
- brokersEntity.setNonPersistentTopicsEnabled(false);
- brokersEntity.setBrokerVersionString("2.4.0-SNAPSHOT");
- brokersEntity.setLoadReportType("LocalBrokerData");
- brokersEntity.setMaxResourceUsage(0.185);
- }
-
- private void checkResult(Page<BrokerEntity> brokersEntityPage) {
- long total = brokersEntityPage.getTotal();
- Assert.assertEquals(total, 1);
- brokersEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getBrokerId(), 1);
- Assert.assertEquals(result.getBroker(), "test-broker");
- Assert.assertEquals(result.getWebServiceUrl(), "http://tengdeMBP:8080");
- Assert.assertEquals(result.getWebServiceUrlTls(), "https://tengdeMBP:8080");
- Assert.assertEquals(result.getPulsarServiceUrl(), "pulsar://tengdeMBP:6650");
- Assert.assertEquals(result.getPulsarServiceUrlTls(), "pulsar+ssl://tengdeMBP:6650");
- Assert.assertTrue(result.isPersistentTopicsEnabled());
- Assert.assertFalse(result.isNonPersistentTopicsEnabled());
- Assert.assertEquals(result.getBrokerVersionString(), "2.4.0-SNAPSHOT");
- Assert.assertEquals(result.getLoadReportType(), "LocalBrokerData");
- Assert.assertEquals(result.getMaxResourceUsage(), 0.185, 3);
- });
- }
-
- private void checkDeleteResult(Page<BrokerEntity> brokersEntityPage) {
- long total = brokersEntityPage.getTotal();
- Assert.assertEquals(total, 0);
- }
-
- @Test
- public void getBrokersList() {
- BrokerEntity brokersEntity = new BrokerEntity();
- initBrokerEntity(brokersEntity);
- brokersRepository.save(brokersEntity);
- Page<BrokerEntity> brokersEntityPage = brokersRepository.getBrokersList(1, 2);
- brokersEntityPage.count(true);
- checkResult(brokersEntityPage);
- brokersEntityPage.getResult().forEach((result) -> {
- brokersRepository.remove(result.getBroker());
- });
- Page<BrokerEntity> deleteBroker = brokersRepository.getBrokersList(1, 2);
- deleteBroker.count(true);
- checkDeleteResult(deleteBroker);
- }
-
- @Test
- public void getBroker() {
- BrokerEntity brokersEntity = new BrokerEntity();
- initBrokerEntity(brokersEntity);
- brokersRepository.save(brokersEntity);
- String broker = "test-broker";
- Optional<BrokerEntity> brokerEntity = brokersRepository.findByBroker(broker);
- Page<BrokerEntity> brokersEntityPage = new Page<BrokerEntity>();
- brokersEntityPage.add(brokerEntity.get());
- brokersEntityPage.setTotal(1);
- checkResult(brokersEntityPage);
- brokersRepository.remove("test-broker");
- Page<BrokerEntity> deleteBroker = brokersRepository.getBrokersList(1, 2);
- deleteBroker.count(true);
- checkDeleteResult(deleteBroker);
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/dao/BundlesRespositoryImplTest.java b/src/test/java/io/streamnative/pulsar/manager/dao/BundlesRespositoryImplTest.java
deleted file mode 100644
index f535928..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/dao/BundlesRespositoryImplTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.PulsarManagerApplication;
-import io.streamnative.pulsar.manager.entity.BundleEntity;
-import io.streamnative.pulsar.manager.entity.BundlesRepository;
-import io.streamnative.pulsar.manager.profiles.SqliteDBTestProfile;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-/**
- * Bundles crud test.
- */
-@RunWith(SpringRunner.class)
-@SpringBootTest(
- classes = {
- PulsarManagerApplication.class,
- SqliteDBTestProfile.class
- }
-)
-@ActiveProfiles("test")
-public class BundlesRespositoryImplTest {
-
- @Autowired
- private BundlesRepository bundlesRepository;
-
- private void initBundleEntity(BundleEntity bundlesEntity) {
- bundlesEntity.setBroker("test-broker");
- bundlesEntity.setTenant("public");
- bundlesEntity.setNamespace("default");
- bundlesEntity.setBundle("0x80000000_0x90000000");
- }
-
- private void checkResult(Page<BundleEntity> bundlesEntityPage) {
- long total = bundlesEntityPage.getTotal();
- Assert.assertEquals(total, 1);
- bundlesEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getBroker(), "test-broker");
- Assert.assertEquals(result.getTenant(), "public");
- Assert.assertEquals(result.getNamespace(), "default");
- Assert.assertEquals(result.getBundle(), "0x80000000_0x90000000");
- });
- }
-
- private void checkDeleteResult(Page<BundleEntity> bundlesEntityPage) {
- long total = bundlesEntityPage.getTotal();
- Assert.assertEquals(total, 0);
- }
-
- @Test
- public void getBundlesList() {
- BundleEntity bundlesEntity = new BundleEntity();
- initBundleEntity(bundlesEntity);
- bundlesRepository.save(bundlesEntity);
- Page<BundleEntity> bundlesEntityPage = bundlesRepository.getBundlesList(1, 2);
- bundlesEntityPage.count(true);
- checkResult(bundlesEntityPage);
- bundlesEntityPage.getResult().forEach((result) -> {
- bundlesRepository.remove(result.getBroker(), result.getTenant(), result.getNamespace(), result.getBundle());
- });
- Page<BundleEntity> deleteBundle = bundlesRepository.getBundlesList(1, 2);
- deleteBundle.count(true);
- checkDeleteResult(deleteBundle);
- }
-
- @Test
- public void getByBrokerOrTenantOrNamespaceOrbundle() {
- BundleEntity bundlesEntity = new BundleEntity();
- initBundleEntity(bundlesEntity);
- bundlesRepository.save(bundlesEntity);
- String broker = "test-broker";
- Page<BundleEntity> bundlesEntityPageByBroker = bundlesRepository
- .findByBrokerOrTenantOrNamespaceOrBundle(1, 2, broker);
- bundlesEntityPageByBroker.count(true);
- checkResult(bundlesEntityPageByBroker);
-
- String tenant = "public";
- Page<BundleEntity> bundlesEntityPageByTenant = bundlesRepository
- .findByBrokerOrTenantOrNamespaceOrBundle(1, 2, tenant);
- bundlesEntityPageByTenant.count(true);
- checkResult(bundlesEntityPageByTenant);
-
- String namespace = "default";
- Page<BundleEntity> bundlesEntityPageByNamespace = bundlesRepository
- .findByBrokerOrTenantOrNamespaceOrBundle(1, 2, namespace);
- bundlesEntityPageByNamespace.count(true);
- checkResult(bundlesEntityPageByNamespace);
-
- String bundle = "0x80000000_0x90000000";
- Page<BundleEntity> bundlesEntityPageByBundle = bundlesRepository
- .findByBrokerOrTenantOrNamespaceOrBundle(1,2, bundle);
- bundlesEntityPageByBundle.count(true);
- checkResult(bundlesEntityPageByBundle);
-
- bundlesEntityPageByBundle.getResult().forEach((result) -> {
- bundlesRepository.remove(result.getBroker(), result.getTenant(), result.getNamespace(), result.getBundle());
- });
-
- Page<BundleEntity> deleteBundle = bundlesRepository.getBundlesList(1, 2);
- deleteBundle.count(true);
- checkDeleteResult(deleteBundle);
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/dao/ClustersRepositoryImplTest.java b/src/test/java/io/streamnative/pulsar/manager/dao/ClustersRepositoryImplTest.java
deleted file mode 100644
index 9ad127d..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/dao/ClustersRepositoryImplTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.PulsarManagerApplication;
-import io.streamnative.pulsar.manager.entity.ClusterEntity;
-import io.streamnative.pulsar.manager.entity.ClustersRepository;
-import io.streamnative.pulsar.manager.profiles.SqliteDBTestProfile;
-import org.apache.pulsar.shade.com.google.gson.Gson;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.LinkedHashSet;
-import java.util.Optional;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(
- classes = {
- PulsarManagerApplication.class,
- SqliteDBTestProfile.class
- }
-)
-@ActiveProfiles("test")
-public class ClustersRepositoryImplTest {
-
- @Autowired
- private ClustersRepository clustersRepository;
-
- private LinkedHashSet<String> clusterNames = new LinkedHashSet<String>() {{
- add("standalone");
- add("pulsar-cluster-1");
- }};
-
- private void initClustersEntity(ClusterEntity clustersEntity) {
- Gson gson = new Gson();
- clustersEntity.setClusterId(1);
- clustersEntity.setCluster("test-cluster");
- clustersEntity.setServiceUrl("http://localhost:8080");
- clustersEntity.setServiceUrlTls("https://localhost:443");
- clustersEntity.setBrokerServiceUrl("pulsar://localhost:6650");
- clustersEntity.setBrokerServiceUrlTls("pulsar+ssl://localhost:6650");
- clustersEntity.setPeerClusterNames(gson.toJson(clusterNames));
- }
-
- private void checkResult(Page<ClusterEntity> clustersEntityPage) {
- long total = clustersEntityPage.getTotal();
- Assert.assertEquals(total, 1);
- Gson gson = new Gson();
- clustersEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getClusterId(), 1);
- Assert.assertEquals(result.getCluster(), "test-cluster");
- Assert.assertEquals(result.getServiceUrl(), "http://localhost:8080");
- Assert.assertEquals(result.getServiceUrlTls(), "https://localhost:443");
- Assert.assertEquals(result.getBrokerServiceUrl(), "pulsar://localhost:6650");
- Assert.assertEquals(result.getBrokerServiceUrlTls(), "pulsar+ssl://localhost:6650");
- Assert.assertEquals(result.getPeerClusterNames(), gson.toJson(clusterNames));
- });
- }
-
- private void checkDeleteResult(Page<ClusterEntity> clustersEntityPage) {
- long total = clustersEntityPage.getTotal();
- Assert.assertEquals(total, 0);
- }
-
- @Test
- public void testGetClustersList() {
- ClusterEntity clustersEntity = new ClusterEntity();
- initClustersEntity(clustersEntity);
- clustersRepository.save(clustersEntity);
- Page<ClusterEntity> clustersEntities = clustersRepository.getClustersList(1, 2);
- clustersEntities.count(true);
- checkResult(clustersEntities);
- clustersEntities.getResult().forEach((result) -> {
- clustersRepository.remove(result.getCluster());
- });
- Page<ClusterEntity> deleteCluster = clustersRepository.getClustersList(1, 2);
- deleteCluster.count(true);
- checkDeleteResult(deleteCluster);
- }
-
- @Test
- public void getCluster() {
- ClusterEntity clustersEntity = new ClusterEntity();
- initClustersEntity(clustersEntity);
- clustersRepository.save(clustersEntity);
- String cluster = "test-cluster";
- Optional<ClusterEntity> clusterEntity = clustersRepository.findByCluster(cluster);
- Page<ClusterEntity> clustersEntities = new Page();
- clustersEntities.add(clusterEntity.get());
- clustersEntities.setTotal(1);
- checkResult(clustersEntities);
- clustersEntities.getResult().forEach((result) -> {
- clustersRepository.remove(result.getCluster());
- });
- Page<ClusterEntity> deleteCluster = clustersRepository.getClustersList(1, 2);
- deleteCluster.count(true);
- checkDeleteResult(deleteCluster);
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/dao/NamespacesRepositoryImplTest.java b/src/test/java/io/streamnative/pulsar/manager/dao/NamespacesRepositoryImplTest.java
deleted file mode 100644
index 6b84c03..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/dao/NamespacesRepositoryImplTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.PulsarManagerApplication;
-import io.streamnative.pulsar.manager.entity.NamespaceEntity;
-import io.streamnative.pulsar.manager.entity.NamespacesRepository;
-import io.streamnative.pulsar.manager.entity.TenantEntity;
-import io.streamnative.pulsar.manager.entity.TenantsRepository;
-import io.streamnative.pulsar.manager.profiles.SqliteDBTestProfile;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(
- classes = {
- PulsarManagerApplication.class,
- SqliteDBTestProfile.class
- }
-)
-@ActiveProfiles("test")
-public class NamespacesRepositoryImplTest {
-
- @Autowired
- private TenantsRepository tenantsRepository;
-
- @Autowired
- private NamespacesRepository namespacesRepository;
-
- public void initNamespaceEntity(NamespaceEntity namespacesEntity) {
- namespacesEntity.setNamespaceId(1);
- namespacesEntity.setTenant("test-namespace-public");
- namespacesEntity.setNamespace("test-namespace-default");
- namespacesEntity.setAuthPolicies("{\"namespace_auth\":{},\"destination_auth\":{},\"subscription_auth_roles\":{}}");
- namespacesEntity.setReplicationClusters("[\"pulsar-cluster-1\"]");
- namespacesEntity.setBoundaries("[\"0x00000000\",\"0x40000000\",\"0x80000000\",\"0xc0000000\",\"0xffffffff\"]");
- namespacesEntity.setNumBundles(4);
- namespacesEntity.setBacklogQuota("{}");
- namespacesEntity.setTopicDispatchRate("{}");
- namespacesEntity.setSubscriptionDispatchRate("{}");
- namespacesEntity.setReplicatorDispatchRate("{}");
- namespacesEntity.setClusterSubscribeRate("{}");
- namespacesEntity.setLatencyStatsSampleRate("{}");
- namespacesEntity.setMessageTtlInSeconds(0);
- namespacesEntity.setDeleted(true);
- namespacesEntity.setEncryptionRequired(false);
- namespacesEntity.setSubscriptionAuthMode("None");
- namespacesEntity.setMaxProducersPerTopic(0);
- namespacesEntity.setMaxConsumersPerTopic(0);
- namespacesEntity.setMaxConsumersPerSubscription(0);
- namespacesEntity.setCompactionThreshold(0);
- namespacesEntity.setOffloadThreshold(-1);
- namespacesEntity.setSchemaAutoApdateCompatibilityStrategy("FULL");
- namespacesEntity.setBookkeeperAckQuorum(0);
- namespacesEntity.setSchemaValidationEnforced(false);
- namespacesEntity.setManagedLedgerMaxMarkDeleteRate(0);
- namespacesEntity.setBookkeeperEnsemble(0);
- namespacesEntity.setBookkeeperWriteQuorum(0);
- }
-
- public void checkResult(Page<NamespaceEntity> namespacesEntityPage) {
- long total = namespacesEntityPage.getTotal();
- Assert.assertEquals(total, 1);
- namespacesEntityPage.getResult().forEach((result) -> {
- Assert.assertEquals(result.getNamespaceId(), 1);
- Assert.assertEquals(result.getTenant(), "test-namespace-public");
- Assert.assertEquals(result.getNamespace(), "test-namespace-default");
- Assert.assertEquals(result.getNumBundles(), 4);
- Assert.assertEquals(result.getBoundaries(),
- "[\"0x00000000\",\"0x40000000\",\"0x80000000\",\"0xc0000000\",\"0xffffffff\"]");
- Assert.assertEquals(result.getAuthPolicies(),
- "{\"namespace_auth\":{},\"destination_auth\":{},\"subscription_auth_roles\":{}}");
- Assert.assertEquals(result.getTopicDispatchRate(), "{}");
- Assert.assertEquals(result.getBacklogQuota(), "{}");
- Assert.assertEquals(result.getSubscriptionAuthMode(), "None");
- Assert.assertEquals(result.getReplicatorDispatchRate(), "{}");
- Assert.assertEquals(result.getClusterSubscribeRate(), "{}");
- Assert.assertEquals(result.getLatencyStatsSampleRate(), "{}");
- Assert.assertEquals(result.getMessageTtlInSeconds(), 0);
- Assert.assertEquals(result.isDeleted(), true);
- Assert.assertEquals(result.isEncryptionRequired(), false);
- Assert.assertEquals(result.getSubscriptionAuthMode(), "None");
- Assert.assertEquals(result.getMaxProducersPerTopic(), 0);
- Assert.assertEquals(result.getMaxConsumersPerTopic(), 0);
- Assert.assertEquals(result.getMaxConsumersPerSubscription(), 0);
- Assert.assertEquals(result.getCompactionThreshold(), 0);
- Assert.assertEquals(result.getOffloadThreshold(), -1);
- Assert.assertEquals(result.getSchemaAutoApdateCompatibilityStrategy(), "FULL");
- Assert.assertEquals(result.getBookkeeperAckQuorum(), 0);
- Assert.assertEquals(result.isSchemaValidationEnforced(), false);
- Assert.assertEquals(result.getBookkeeperEnsemble(), 0);
- Assert.assertEquals(result.getManagedLedgerMaxMarkDeleteRate(), 0, 0);
- Assert.assertEquals(result.getBookkeeperEnsemble(), 0);
- });
- }
-
- public void checkDeleteResult(Page<NamespaceEntity> namespacesEntityPage) {
- long total = namespacesEntityPage.getTotal();
- Assert.assertEquals(total, 0);
- }
-
- @Before
- public void setup() {
- prepareTenant();
- }
-
- @After
- public void clear() {
- clearTenant();
- }
-
- public void prepareTenant() {
- TenantEntity tenantsEntity = new TenantEntity(
- 1, "test-namespace-public", "testrole", "testCluster");
- tenantsRepository.save(tenantsEntity);
- }
-
- public void clearTenant() {
- tenantsRepository.removeByTenant("test-namespace-public");
- }
-
- @Test
- public void getNamespacesList() {
- NamespaceEntity namespacesEntity = new NamespaceEntity();
- initNamespaceEntity(namespacesEntity);
- namespacesRepository.save(namespacesEntity);
- Page<NamespaceEntity> namespacesEntityPage = namespacesRepository.getNamespacesList(1, 2);
- namespacesEntityPage.count(true);
- checkResult(namespacesEntityPage);
- namespacesEntityPage.getResult().forEach((result) -> {
- namespacesRepository.remove(result.getTenant(), result.getNamespace());
- });
- Page<NamespaceEntity> deleteNamespace = namespacesRepository.getNamespacesList(1, 2);
- deleteNamespace.count(true);
- checkDeleteResult(deleteNamespace);
- }
-
- @Test
- public void getNamespaceByTenantOrNamespace() {
- NamespaceEntity namespacesEntity = new NamespaceEntity();
- initNamespaceEntity(namespacesEntity);
- namespacesRepository.save(namespacesEntity);
- String tenant = "test-namespace-public";
- Page<NamespaceEntity> namespacesEntityPageByTenant = namespacesRepository.
- findByTenantOrNamespace(1, 2, tenant);
- namespacesEntityPageByTenant.count(true);
- checkResult(namespacesEntityPageByTenant);
- String namespace = "test-namespace-default";
- Page<NamespaceEntity> namespacesEntityPageByNamespace = namespacesRepository.
- findByTenantOrNamespace(1, 2, namespace);
- namespacesEntityPageByNamespace.count(true);
- checkResult(namespacesEntityPageByNamespace);
- namespacesEntityPageByNamespace.getResult().forEach((result) -> {
- namespacesRepository.remove(result.getTenant(), result.getNamespace());
- });
- Page<NamespaceEntity> deleteNamespace = namespacesRepository.getNamespacesList(1, 2);
- deleteNamespace.count(true);
- checkDeleteResult(deleteNamespace);
- }
-}
diff --git a/src/test/java/io/streamnative/pulsar/manager/dao/TenantsRepositoryImplTest.java b/src/test/java/io/streamnative/pulsar/manager/dao/TenantsRepositoryImplTest.java
deleted file mode 100644
index abfa560..0000000
--- a/src/test/java/io/streamnative/pulsar/manager/dao/TenantsRepositoryImplTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.streamnative.pulsar.manager.dao;
-
-import com.github.pagehelper.Page;
-import io.streamnative.pulsar.manager.PulsarManagerApplication;
-import io.streamnative.pulsar.manager.entity.TenantEntity;
-import io.streamnative.pulsar.manager.entity.TenantsRepository;
-import io.streamnative.pulsar.manager.profiles.SqliteDBTestProfile;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import java.util.List;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(
- classes = {
- PulsarManagerApplication.class,
- SqliteDBTestProfile.class
- }
-)
-@ActiveProfiles("test")
-public class TenantsRepositoryImplTest {
-
- @Autowired
- private TenantsRepository tenantsRepository;
-
- @Test
- public void getTenantsListTest() {
- for (int i = 0; i < 10; i++) {
- TenantEntity tenantsEntity = new TenantEntity(i, "test" + i, "testrole", "testCluster");
- tenantsRepository.save(tenantsEntity);
- }
- Page<TenantEntity> tenantsEntities = tenantsRepository.getTenantsList(1, 10);
- tenantsEntities.count(true);
- long total = tenantsEntities.getTotal();
- Assert.assertEquals(total, 10);
- List<TenantEntity> tenantsEntityList = tenantsEntities.getResult();
- for (int i = 0; i < total; i ++) {
- TenantEntity tenantsEntity = tenantsEntityList.get(i);
- Assert.assertEquals(tenantsEntity.getTenantId(), i);
- Assert.assertEquals(tenantsEntity.getTenant(), "test" + i);
- Assert.assertEquals(tenantsEntity.getAdminRoles(), "testrole");
- Assert.assertEquals(tenantsEntity.getAllowedClusters(), "testCluster");
- }
- tenantsEntities.getResult().forEach((result) -> {
- tenantsRepository.remove(result.getTenantId());
- });
- }
-}