### Modifications (#190)
Currently, pulsar-manager supports pluggable databases, but some sql is different before different databases. For example, ` can be used in mysql and "" marks in postgresql. Therefore, this change ensures that sql can be executed in each database.
### Modifications
* All fields in the database are lowercase, including table names and column names.
* Replace the default database in docker from mysql to postgresql.
* Add the configuration of backend log output to file.
* Replace the database key that appears in the table.
* Remove mysql related configuration.
diff --git a/.gitignore b/.gitignore
index e94490a..0e3afd8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,3 +43,4 @@
build/
*.db
front-end/data/
+*.log
diff --git a/README.md b/README.md
index 4e9d4a0..bbd7ef6 100644
--- a/README.md
+++ b/README.md
@@ -100,6 +100,12 @@
docker run -it -p 9527:9527 -e REDIRECT_HOST=front-end-ip -e REDIRECT_PORT=front-end-port -e DRIVER_CLASS_NAME=com.mysql.jdbc.Driver -e URL='jdbc-url' -e USERNAME=root -e PASSWORD=pulsar pulsar-manager /bin/sh
```
+ This is an example:
+
+ ```
+ docker run -it -p 9527:9527 -e REDIRECT_HOST=http://192.168.0.104 -e REDIRECT_PORT=9527 -e DRIVER_CLASS_NAME=org.postgresql.Driver -e URL='jdbc:postgresql://127.0.0.1:5432/pulsar_manager' -e USERNAME=pulsar -e PASSWORD=pulsar -v $PWD:/data pulsar-manager:latest /bin/sh
+ ```
+
* Build a local environment
(1) Download the source code.
diff --git a/build.gradle b/build.gradle
index e268194..ed42828 100644
--- a/build.gradle
+++ b/build.gradle
@@ -63,7 +63,7 @@
compile group: 'org.springframework.boot', name: 'spring-boot-devtools', version: springBootVersion
compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-netflix-zuul', version: springBootVersion
compile group: 'org.mybatis.spring.boot', name: 'mybatis-spring-boot-starter', version: springMybatisVersion
- compile group: 'mysql', name: 'mysql-connector-java', version: mysqlConnectorVersion
+ compile group: 'org.postgresql', name: 'postgresql', version: postgresqlVersion
compile group: 'javax.validation', name: 'validation-api', version: javaxValidationVersion
compile group: 'io.jsonwebtoken', name: 'jjwt', version: jsonWebTokenVersion
compile group: 'org.xerial', name: 'sqlite-jdbc', version: sqliteVersion
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 5aa47e5..f35604a 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -35,8 +35,7 @@
RUN apk add nginx \
&& apk add supervisor \
- && apk add mysql \
- && apk add mysql-client \
+ && apk add postgresql \
&& rm -rf /tmp/* \
&& rm /var/cache/apk/*
@@ -44,8 +43,6 @@
WORKDIR /pulsar-manager
-COPY docker/my.cnf /etc/
-
COPY build/libs/*.jar pulsar-manager.jar
COPY docker/supervisord.conf /etc/
diff --git a/docker/default.conf b/docker/default.conf
index 6cc76f0..74400d8 100644
--- a/docker/default.conf
+++ b/docker/default.conf
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
server {
listen 9527;
server_name 0.0.0.0;
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index d8dac75..d5a84fe 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -1,13 +1,41 @@
#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
-echo 'Starting Mysql Server'
+echo 'Starting PostGreSQL Server'
-/pulsar-manager/startup.sh
+addgroup pulsar
+adduser --disabled-password --ingroup pulsar pulsar
+mkdir -p /run/postgresql
+chown -R pulsar:pulsar /run/postgresql/
+chown -R pulsar:pulsar /data
+chown pulsar:pulsar /pulsar-manager/init_db.sql
+chmod 750 /data
+
+su - pulsar -s /bin/sh /pulsar-manager/startup.sh
echo 'Starting Pulsar Manager Front end'
nginx
echo 'Starting Pulsar Manager Back end'
+touch /pulsar-manager/supervisor.sock
+chmod 777 /pulsar-manager/supervisor.sock
supervisord -c /etc/supervisord.conf -n
diff --git a/docker/init_db.sql b/docker/init_db.sql
index 824b6cb..ce02ea4 100644
--- a/docker/init_db.sql
+++ b/docker/init_db.sql
@@ -1,22 +1,31 @@
-USE mysql;
-FLUSH PRIVILEGES;
-GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY "pulsar" WITH GRANT OPTION;
-GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
-UPDATE user SET password=PASSWORD("pulsar") WHERE user='root';
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+alter user pulsar with password 'pulsar';
+CREATE DATABASE pulsar_manager OWNER pulsar;
+GRANT ALL PRIVILEGES ON DATABASE pulsar_manager to pulsar;
-CREATE DATABASE IF NOT EXISTS pulsar_manager;
-
-USE pulsar_manager;
+\c pulsar_manager;
CREATE TABLE IF NOT EXISTS environments (
name varchar(256) NOT NULL,
broker varchar(1024) NOT NULL,
CONSTRAINT PK_name PRIMARY KEY (name),
UNIQUE (broker)
-) ENGINE=InnoDB CHARACTER SET utf8;
+);
-CREATE TABLE IF NOT EXISTS topicsStats (
- topicStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
+CREATE TABLE IF NOT EXISTS topics_stats (
+ topic_stats_id BIGSERIAL PRIMARY KEY,
environment varchar(255) NOT NULL,
cluster varchar(255) NOT NULL,
broker varchar(255) NOT NULL,
@@ -25,85 +34,85 @@
bundle varchar(255) NOT NULL,
persistent varchar(36) NOT NULL,
topic varchar(255) NOT NULL,
- producerCount BIGINT,
- subscriptionCount BIGINT,
- msgRateIn double,
- msgThroughputIn double,
- msgRateOut double,
- msgThroughputOut double,
- averageMsgSize double,
- storageSize double,
- timestamp BIGINT
-) ENGINE=InnoDB CHARACTER SET utf8;
+ producer_count BIGINT,
+ subscription_count BIGINT,
+ msg_rate_in double precision ,
+ msg_throughput_in double precision ,
+ msg_rate_out double precision ,
+ msg_throughput_out double precision ,
+ average_msg_size double precision ,
+ storage_size double precision ,
+ time_stamp BIGINT
+);
-CREATE TABLE IF NOT EXISTS publishersStats (
- publisherStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
- producerId BIGINT,
- topicStatsId BIGINT NOT NULL,
- producerName varchar(255) NOT NULL,
- msgRateIn double,
- msgThroughputIn double,
- averageMsgSize double,
+CREATE TABLE IF NOT EXISTS publishers_stats (
+ publisher_stats_id BIGSERIAL PRIMARY KEY,
+ producer_id BIGINT,
+ topic_stats_id BIGINT NOT NULL,
+ producer_name varchar(255) NOT NULL,
+ msg_rate_in double precision ,
+ msg_throughput_in double precision ,
+ average_msg_size double precision ,
address varchar(255),
- connectedSince varchar(128),
- clientVersion varchar(36),
+ connected_since varchar(128),
+ client_version varchar(36),
metadata text,
- timestamp BIGINT,
- CONSTRAINT FK_publishers_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
-) ENGINE=InnoDB CHARACTER SET utf8;
+ time_stamp BIGINT,
+ CONSTRAINT fk_publishers_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+);
-CREATE TABLE IF NOT EXISTS replicationsStats (
- replicationStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
- topicStatsId BIGINT NOT NULL,
+CREATE TABLE IF NOT EXISTS replications_stats (
+ replication_stats_id BIGSERIAL PRIMARY KEY,
+ topic_stats_id BIGINT NOT NULL,
cluster varchar(255) NOT NULL,
connected BOOLEAN,
- msgRateIn double,
- msgRateOut double,
- msgRateExpired double,
- msgThroughputIn double,
- msgThroughputOut double,
- msgRateRedeliver double,
- replicationBacklog BIGINT,
- replicationDelayInSeconds BIGINT,
- inboundConnection varchar(255),
- inboundConnectedSince varchar(255),
- outboundConnection varchar(255),
- outboundConnectedSince varchar(255),
- timestamp BIGINT,
- CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
-) ENGINE=InnoDB CHARACTER SET utf8;
+ msg_rate_in double precision ,
+ msg_rate_out double precision ,
+ msg_rate_expired double precision ,
+ msg_throughput_in double precision ,
+ msg_throughput_out double precision ,
+ msg_rate_redeliver double precision ,
+ replication_backlog BIGINT,
+ replication_delay_in_seconds BIGINT,
+ inbound_connection varchar(255),
+ inbound_connected_since varchar(255),
+ outbound_connection varchar(255),
+ outbound_connected_since varchar(255),
+ time_stamp BIGINT,
+ CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+);
-CREATE TABLE IF NOT EXISTS subscriptionsStats (
- subscriptionStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
- topicStatsId BIGINT NOT NULL,
+CREATE TABLE IF NOT EXISTS subscriptions_stats (
+ subscription_stats_id BIGSERIAL PRIMARY KEY,
+ topic_stats_id BIGINT NOT NULL,
subscription varchar(255) NULL,
- msgBacklog BIGINT,
- msgRateExpired double,
- msgRateOut double,
- msgThroughputOut double,
- msgRateRedeliver double,
- numberOfEntriesSinceFirstNotAckedMessage BIGINT,
- totalNonContiguousDeletedMessagesRange BIGINT,
- subscriptionType varchar(16),
- blockedSubscriptionOnUnackedMsgs BOOLEAN,
- timestamp BIGINT,
- UNIQUE (topicStatsId, subscription),
- CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
-) ENGINE=InnoDB CHARACTER SET utf8;
+ msg_backlog BIGINT,
+ msg_rate_expired double precision ,
+ msg_rate_out double precision ,
+ msg_throughput_out double precision ,
+ msg_rate_redeliver double precision ,
+ number_of_entries_since_first_not_acked_message BIGINT,
+ total_non_contiguous_deleted_messages_range BIGINT,
+ subscription_type varchar(16),
+ blocked_subscription_on_unacked_msgs BOOLEAN,
+ time_stamp BIGINT,
+ UNIQUE (topic_stats_id, subscription),
+ CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+);
-CREATE TABLE IF NOT EXISTS consumersStats (
- consumerStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
+CREATE TABLE IF NOT EXISTS consumers_stats (
+ consumer_stats_id BIGSERIAL PRIMARY KEY,
consumer varchar(255) NOT NULL,
- topicStatsId BIGINT NOT NUll,
- replicationStatsId BIGINT,
- subscriptionStatsId BIGINT,
+ topic_stats_id BIGINT NOT NUll,
+ replication_stats_id BIGINT,
+ subscription_stats_id BIGINT,
address varchar(255),
- availablePermits BIGINT,
- connectedSince varchar(255),
- msgRateOut double,
- msgThroughputOut double,
- msgRateRedeliver double,
- clientVersion varchar(36),
- timestamp BIGINT,
+ available_permits BIGINT,
+ connected_since varchar(255),
+ msg_rate_out double precision ,
+ msg_throughput_out double precision ,
+ msg_rate_redeliver double precision ,
+ client_version varchar(36),
+ time_stamp BIGINT,
metadata text
-) ENGINE=InnoDB CHARACTER SET utf8;
\ No newline at end of file
+);
\ No newline at end of file
diff --git a/docker/my.cnf b/docker/my.cnf
deleted file mode 100644
index b3ef356..0000000
--- a/docker/my.cnf
+++ /dev/null
@@ -1,6 +0,0 @@
-[mysqld]
-user = root
-datadir = /app/mysql
-port = 3306
-log-bin = /app/mysql/mysql-bin
-bind-address = 0.0.0.0
diff --git a/docker/startup.sh b/docker/startup.sh
index e821ab5..04f756c 100755
--- a/docker/startup.sh
+++ b/docker/startup.sh
@@ -1,20 +1,25 @@
#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
-if [ -d /app/mysql ]; then
- echo "[i] MySQL directory already present, skipping creation"
-else
- echo "[i] MySQL data directory not found, creating initial DBs"
-
- mysql_install_db --user=root > /dev/null
- mkdir -p /app/mysql
-
- if [ ! -d "/run/mysqld" ]; then
- mkdir -p /run/mysqld
- fi
-
- /usr/bin/mysqld --user=root --bootstrap --verbose=0 < /pulsar-manager/init_db.sql
-fi
-
-mysqld_safe &
-
+initdb -D /data/postgresql
+pg_ctl -D /data/postgresql start
+createdb
+psql -f /pulsar-manager/init_db.sql
diff --git a/docker/supervisord.conf b/docker/supervisord.conf
index b98ad58..bd93427 100644
--- a/docker/supervisord.conf
+++ b/docker/supervisord.conf
@@ -1,5 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
[supervisorctl]
-serverurl=unix:///tmp/supervisor.sock
+serverurl=unix:///pulsar-manager/supervisor.sock
[supervisord]
logfile_maxbytes=50MB
diff --git a/gradle.properties b/gradle.properties
index 1a3dabf..b157a58 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,6 +1,5 @@
# dependencies version
springMybatisVersion=1.3.2
-mysqlConnectorVersion=5.1.37
javaxValidationVersion=2.0.0.Final
jsonWebTokenVersion=0.9.0
sqliteVersion=3.21.0.1
@@ -14,3 +13,4 @@
apiMockitoVersion=1.7.1
mockitoJunit4Version=1.7.1
gsonVersion=2.8.2
+postgresqlVersion=42.2.5
diff --git a/src/README.md b/src/README.md
index d72ab8b..ac4357e 100644
--- a/src/README.md
+++ b/src/README.md
@@ -26,21 +26,17 @@
### Use custom databases
-If you have a large amount of data, you can use a custom database. The following is an example of MySQL.
+If you have a large amount of data, you can use a custom database. The following is an example of PostgreSQL.
-1. Initialize database and table structures using [file](https://github.com/streamnative/pulsar-manager/tree/master/src/main/resources/META-INF/sql/mysql-schema.sql).
+1. Initialize database and table structures using [file](https://github.com/streamnative/pulsar-manager/tree/master/src/main/resources/META-INF/sql/postgresql-schema.sql).
-2. Modify the [configuration file](https://github.com/streamnative/pulsar-manager/blob/master/src/main/resources/application.properties) and add MySQL configuration
+2. Modify the [configuration file](https://github.com/streamnative/pulsar-manager/blob/master/src/main/resources/application.properties) and add PostgreSQL configuration
```
-spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-spring.datasource.url=jdbc:mysql://ip:port/pulsar_manager?useSSL=false
-spring.datasource.username=username
-spring.datasource.password=password
-spring.datasource.max-idle=10
-spring.datasource.max-wait=10000
-spring.datasource.min-idle=5
-spring.datasource.initial-size=5
+spring.datasource.driver-class-name=org.postgresql.Driver
+spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/pulsar_manager
+spring.datasource.username=postgres
+spring.datasource.password=postgres
```
3. Compile to generate a new executable jar package
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/ConsumerStatsMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/ConsumerStatsMapper.java
index 01feed7..5976a76 100644
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/ConsumerStatsMapper.java
+++ b/src/main/java/io/streamnative/pulsar/manager/mapper/ConsumerStatsMapper.java
@@ -20,36 +20,42 @@
@Mapper
public interface ConsumerStatsMapper {
- @Insert("INSERT INTO consumersStats(consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
- "availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
- "clientVersion,timestamp,metadata) " +
+ @Insert("INSERT INTO consumers_stats(consumer,topic_stats_id,replication_stats_id,subscription_stats_id,address," +
+ "available_permits,connected_since,msg_rate_out,msg_throughput_out,msg_rate_redeliver," +
+ "client_version,time_stamp,metadata) " +
"VALUES(#{consumer},#{topicStatsId},#{replicationStatsId},#{subscriptionStatsId},#{address}," +
"#{availablePermits},#{connectedSince},#{msgRateOut},#{msgThroughputOut},#{msgRateRedeliver}," +
"#{clientVersion},#{timestamp},#{metadata})")
- @Options(useGeneratedKeys=true, keyProperty="consumerStatsId", keyColumn="consumerStatsId")
+ @Options(useGeneratedKeys=true, keyProperty="consumerStatsId", keyColumn="consumer_stats_id")
void save(ConsumerStatsEntity consumerStatsEntity);
- @Select("SELECT consumerStatsId,consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
- "availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
- "clientVersion,timestamp,metadata FROM consumersStats " +
- "where topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
+ @Select("SELECT consumer_stats_id as consumerStatsId,consumer as consumer,topic_stats_id as topicStatsId," +
+ "replication_stats_id as replicationStatsId,subscription_stats_id as subscriptionStatsId,address as address," +
+ "available_permits as availablePermits,connected_since as connectedSince,msg_rate_out as msgRateOut," +
+ "msg_throughput_out as msgThroughputOut,msg_rate_redeliver as msgRateRedeliver," +
+ "client_version as clientVersion,time_stamp as timestamp,metadata as metadata FROM consumers_stats " +
+ "where topic_stats_id=#{topicStatsId} and time_stamp=#{timestamp}")
Page<ConsumerStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);
- @Select("SELECT consumerStatsId,consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
- "availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
- "clientVersion,timestamp,metadata FROM consumersStats " +
- "where subscriptionStatsId=#{subscriptionStatsId} and timestamp=#{timestamp}")
+ @Select("SELECT consumer_stats_id as consumerStatsId,consumer as consumer,topic_stats_id as topicStatsId," +
+ "replication_stats_id as replicationStatsId,subscription_stats_id as subscriptionStatsId,address as address," +
+ "available_permits as availablePermits,connected_since as connectedSince,msg_rate_out as msgRateOut," +
+ "msg_throughput_out as msgThroughputOut,msg_rate_redeliver as msgRateRedeliver," +
+ "client_version as clientVersion,time_stamp as timestamp,metadata as metadata FROM consumers_stats " +
+ "where subscription_stats_id=#{subscriptionStatsId} and time_stamp=#{timestamp}")
Page<ConsumerStatsEntity> findBySubscriptionStatsId(@Param("subscriptionStatsId") long subscriptionStatsId,
@Param("timestamp") long timestamp);
- @Select("SELECT consumerStatsId,consumer,topicStatsId,replicationStatsId,subscriptionStatsId,address," +
- "availablePermits,connectedSince,msgRateOut,msgThroughputOut,msgRateRedeliver," +
- "clientVersion,timestamp,metadata FROM consumersStats " +
- "where replicationStatsId=#{replicationStatsId} and timestamp=#{timestamp}")
+ @Select("SELECT consumer_stats_id as consumerStatsId,consumer as consumer,topic_stats_id as topicStatsId," +
+ "replication_stats_id as replicationStatsId,subscription_stats_id as subscriptionStatsId,address as address," +
+ "available_permits as availablePermits,connected_since as connectedSince,msg_rate_out as msgRateOut," +
+ "msg_throughput_out as msgThroughputOut,msg_rate_redeliver as msgRateRedeliver," +
+ "client_version as clientVersion,time_stamp as timestamp,metadata as metadata FROM consumers_stats " +
+ "where replication_stats_id=#{replicationStatsId} and time_stamp=#{timestamp}")
Page<ConsumerStatsEntity> findByReplicationStatsId(@Param("replicationStatsId") long replicationStatsId,
@Param("timestamp") long timestamp);
- @Delete("DELETE FROM consumersStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
+ @Delete("DELETE FROM consumers_stats WHERE #{nowTime} - #{timeInterval} >= time_stamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/PublishersStatsMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/PublishersStatsMapper.java
index 5ef226c..a352082 100644
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/PublishersStatsMapper.java
+++ b/src/main/java/io/streamnative/pulsar/manager/mapper/PublishersStatsMapper.java
@@ -20,19 +20,21 @@
@Mapper
public interface PublishersStatsMapper {
- @Insert("INSERT INTO publishersStats(producerId,topicStatsId,producerName,msgRateIn," +
- "msgThroughputIn,averageMsgSize,address,connectedSince,clientVersion,metadata,timestamp) " +
+ @Insert("INSERT INTO publishers_stats(producer_id,topic_stats_id,producer_name,msg_rate_in," +
+ "msg_throughput_in,average_msg_size,address,connected_since,client_version,metadata,time_stamp) " +
"VALUES(#{producerId},#{topicStatsId},#{producerName},#{msgRateIn},#{msgThroughputIn}," +
"#{averageMsgSize},#{address},#{connectedSince},#{clientVersion},#{metadata},#{timestamp})")
- @Options(useGeneratedKeys=true, keyProperty="publisherStatsId", keyColumn="publisherStatsId")
+ @Options(useGeneratedKeys=true, keyProperty="publisherStatsId", keyColumn="publisher_stats_id")
void save(PublisherStatsEntity publisherStatsEntity);
- @Select("SELECT publisherStatsId,producerId,topicStatsId,producerName,msgRateIn,msgThroughputIn,averageMsgSize," +
- "address,connectedSince,clientVersion,metadata,timestamp From publishersStats " +
- "WHERE topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
+ @Select("SELECT publisher_stats_id as publisherStatsId,producer_id as producerId,topic_stats_id as topicStatsId," +
+ "producer_name as producerName,msg_rate_in as msgRateIn,msg_throughput_in as msgThroughputIn," +
+ "average_msg_size as averageMsgSize,address as address,connected_since as connectedSince," +
+ "client_version as clientVersion,metadata as metadata,time_stamp as timestamp From publishers_stats " +
+ "WHERE topic_stats_id=#{topicStatsId} and time_stamp=#{timestamp}")
Page<PublisherStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);
- @Delete("DELETE FROM publishersStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
+ @Delete("DELETE FROM publishers_stats WHERE #{nowTime} - #{timeInterval} >= time_stamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/ReplicationsStatsMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/ReplicationsStatsMapper.java
index 9c9219c..b417cb0 100644
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/ReplicationsStatsMapper.java
+++ b/src/main/java/io/streamnative/pulsar/manager/mapper/ReplicationsStatsMapper.java
@@ -20,23 +20,27 @@
@Mapper
public interface ReplicationsStatsMapper {
- @Insert("INSERT INTO replicationsStats(topicStatsId,cluster,connected,msgRateIn,msgRateOut,msgThroughputIn," +
- "msgThroughputOut,replicationBacklog,replicationDelayInSeconds,inboundConnection," +
- "inboundConnectedSince,outboundConnection,outboundConnectedSince,timestamp,msgRateExpired) " +
+ @Insert("INSERT INTO replications_stats(topic_stats_id,cluster,connected,msg_rate_in,msg_rate_out,msg_throughput_in," +
+ "msg_throughput_out,replication_backlog,replication_delay_in_seconds,inbound_connection," +
+ "inbound_connected_since,outbound_connection,outbound_connected_since,time_stamp,msg_rate_expired) " +
"VALUES(#{topicStatsId},#{cluster},#{connected},#{msgRateIn},#{msgRateOut},#{msgThroughputIn}," +
"#{msgThroughputOut},#{replicationBacklog},#{replicationDelayInSeconds}," +
"#{inboundConnection},#{inboundConnectedSince},#{outboundConnection},#{outboundConnectedSince}," +
"#{timestamp},#{msgRateExpired})")
- @Options(useGeneratedKeys=true, keyProperty="replicationStatsId", keyColumn="replicationStatsId")
+ @Options(useGeneratedKeys=true, keyProperty="replicationStatsId", keyColumn="replication_stats_id")
void save(ReplicationStatsEntity replicationStatsEntity);
- @Select("SELECT replicationStatsId,topicStatsId,cluster,connected,msgRateIn,msgRateOut,msgThroughputIn,msgThroughputOut," +
- "replicationBacklog,replicationDelayInSeconds,inboundConnection,inboundConnectedSince," +
- "outboundConnection,outboundConnectedSince,timestamp,msgRateExpired FROM replicationsStats " +
- "where topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
+ @Select("SELECT replication_stats_id as replicationStatsId,topic_stats_id as topicStatsId,cluster as cluster," +
+ "connected as connected,msg_rate_in as msgRateIn,msg_rate_out as msgRateOut," +
+ "msg_throughput_in as msgThroughputIn,msg_throughput_out as msgThroughputOut," +
+ "replication_backlog as replicationBacklog,replication_delay_in_seconds as replicationDelayInSeconds," +
+ "inbound_connection as inboundConnection,inbound_connected_since as inboundConnectedSince," +
+ "outbound_connection as outboundConnection,outbound_connected_since as outboundConnectedSince," +
+ "time_stamp as timestamp,msg_rate_expired as msgRateExpired FROM replications_stats " +
+ "where topic_stats_id=#{topicStatsId} and time_stamp=#{timestamp}")
Page<ReplicationStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);
- @Delete("DELETE FROM replicationsStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
+ @Delete("DELETE FROM replications_stats WHERE #{nowTime} - #{timeInterval} >= time_stamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/SubscriptionsStatsMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/SubscriptionsStatsMapper.java
index 559b85f..e940587 100644
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/SubscriptionsStatsMapper.java
+++ b/src/main/java/io/streamnative/pulsar/manager/mapper/SubscriptionsStatsMapper.java
@@ -20,23 +20,26 @@
@Mapper
public interface SubscriptionsStatsMapper {
- @Insert("INSERT INTO subscriptionsStats(topicStatsId,subscription,msgBacklog,msgRateExpired," +
- "msgRateOut,msgThroughputOut,msgRateRedeliver,numberOfEntriesSinceFirstNotAckedMessage," +
- "totalNonContiguousDeletedMessagesRange,subscriptionType,timestamp) " +
+ @Insert("INSERT INTO subscriptions_stats(topic_stats_id,subscription,msg_backlog,msg_rate_expired," +
+ "msg_rate_out,msg_throughput_out,msg_rate_redeliver,number_of_entries_since_first_not_acked_message," +
+ "total_non_contiguous_deleted_messages_range,subscription_type,time_stamp) " +
"VALUES(#{topicStatsId},#{subscription},#{msgBacklog},#{msgRateExpired},#{msgRateOut}," +
"#{msgThroughputOut},#{msgRateRedeliver},#{numberOfEntriesSinceFirstNotAckedMessage}," +
"#{totalNonContiguousDeletedMessagesRange},#{subscriptionType}," +
"#{timestamp})")
- @Options(useGeneratedKeys=true, keyProperty="subscriptionStatsId", keyColumn="subscriptionStatsId")
+ @Options(useGeneratedKeys=true, keyProperty="subscriptionStatsId", keyColumn="subscription_stats_id")
void save(SubscriptionStatsEntity subscriptionStatsEntity);
- @Select("SELECT subscriptionStatsId,topicStatsId,subscription,msgBacklog,msgRateExpired,msgRateOut," +
- "msgThroughputOut,msgRateRedeliver,numberOfEntriesSinceFirstNotAckedMessage," +
- "totalNonContiguousDeletedMessagesRange,subscriptionType,timestamp FROM subscriptionsStats " +
- "where topicStatsId=#{topicStatsId} and timestamp=#{timestamp}")
+ @Select("SELECT subscription_stats_id as subscriptionStatsId,topic_stats_id as topicStatsId," +
+ "subscription as subscription,msg_backlog as msgBacklog,msg_rate_expired as msgRateExpired," +
+ "msg_rate_out as msgRateOut,msg_throughput_out as msgThroughputOut,msg_rate_redeliver as msgRateRedeliver," +
+ "number_of_entries_since_first_not_acked_message as numberOfEntriesSinceFirstNotAckedMessage," +
+ "total_non_contiguous_deleted_messages_range as totalNonContiguousDeletedMessagesRange," +
+ "subscription_type as subscriptionType,time_stamp as timestamp FROM subscriptions_stats " +
+ "where topic_stats_id=#{topicStatsId} and time_stamp=#{timestamp}")
Page<SubscriptionStatsEntity> findByTopicStatsId(@Param("topicStatsId") long topicStatsId,
@Param("timestamp") long timestamp);
- @Delete("DELETE FROM subscriptionsStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
+ @Delete("DELETE FROM subscriptions_stats WHERE #{nowTime} - #{timeInterval} >= time_stamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/TopicsStatsMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/TopicsStatsMapper.java
index b70d466..2c1c608 100644
--- a/src/main/java/io/streamnative/pulsar/manager/mapper/TopicsStatsMapper.java
+++ b/src/main/java/io/streamnative/pulsar/manager/mapper/TopicsStatsMapper.java
@@ -22,33 +22,45 @@
@Mapper
public interface TopicsStatsMapper {
- @Insert("INSERT INTO topicsStats(environment, cluster,broker,tenant,namespace,bundle,persistent,topic," +
- "producerCount,subscriptionCount,msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut," +
- "averageMsgSize,storageSize,timestamp) " +
+ @Insert("INSERT INTO topics_stats(environment, cluster,broker,tenant,namespace,bundle,persistent,topic," +
+ "producer_count,subscription_count,msg_rate_in,msg_throughput_in,msg_rate_out,msg_throughput_out," +
+ "average_msg_size,storage_size,time_stamp) " +
"VALUES(#{environment},#{cluster},#{broker},#{tenant},#{namespace},#{bundle},#{persistent},#{topic}," +
"#{producerCount},#{subscriptionCount},#{msgRateIn},#{msgThroughputIn},#{msgRateOut},#{msgThroughputOut}," +
"#{averageMsgSize},#{storageSize},#{timestamp})")
- @Options(useGeneratedKeys=true, keyProperty="topicStatsId", keyColumn="topicStatsId")
+ @Options(useGeneratedKeys=true, keyProperty="topicStatsId", keyColumn="topic_stats_id")
void insert(TopicStatsEntity topicStatsEntity);
- @Select("SELECT topicStatsId,environment,cluster,broker,tenant,namespace,bundle,persistent,topic,producerCount,subscriptionCount," +
- "msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,timestamp FROM topicsStats " +
+ @Select("SELECT topic_stats_id as topicStatsId,environment as environment,cluster as cluster,broker as broker," +
+ "tenant as tenant,namespace as namespace,bundle as bundle,persistent as persistent," +
+ "topic as topic,producer_count as producerCount,subscription_count as subscriptionCount," +
+ "msg_rate_in as msgRateIn,msg_throughput_in as msgThroughputIn,msg_rate_out as msgRateOut," +
+ "msg_throughput_out as msgThroughputOut,average_msg_size as averageMsgSize,storage_size as storageSize," +
+ "time_stamp as timestamp FROM topics_stats " +
"ORDER BY timestamp DESC limit 1 ")
TopicStatsEntity findMaxTime();
- @Select("SELECT topicStatsId,environment,cluster,broker,tenant,namespace,bundle,persistent,topic,producerCount,subscriptionCount," +
- "msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,timestamp FROM topicsStats " +
- "WHERE environment=#{environment} and cluster=#{cluster} and broker=#{broker} and timestamp=#{timestamp}")
+ @Select("SELECT topic_stats_id as topicStatsId,environment as environment,cluster as cluster,broker as broker," +
+ "tenant as tenant,namespace as namespace,bundle as bundle,persistent as persistent," +
+ "topic as topic,producer_count as producerCount,subscription_count as subscriptionCount," +
+ "msg_rate_in as msgRateIn,msg_throughput_in as msgThroughputIn,msg_rate_out as msgRateOut," +
+ "msg_throughput_out as msgThroughputOut,average_msg_size as averageMsgSize,storage_size as storageSize," +
+ "time_stamp as timestamp FROM topics_stats " +
+ "WHERE environment=#{environment} and cluster=#{cluster} and broker=#{broker} and time_stamp=#{timestamp}")
Page<TopicStatsEntity> findByClusterBroker(
@Param("environment") String environment,
@Param("cluster") String cluster,
@Param("broker") String broker,
@Param("timestamp") long timestamp);
- @Select("SELECT topicStatsId,environment,cluster,tenant,namespace,bundle,persistent,topic,producerCount,subscriptionCount," +
- "msgRateIn,msgThroughputIn,msgRateOut,msgThroughputOut,averageMsgSize,storageSize,timestamp FROM topicsStats " +
+ @Select("SELECT topic_stats_id as topicStatsId,environment as environment,cluster as cluster,broker as broker," +
+ "tenant as tenant,namespace as namespace,bundle as bundle,persistent as persistent," +
+ "topic as topic,producer_count as producerCount,subscription_count as subscriptionCount," +
+ "msg_rate_in as msgRateIn,msg_throughput_in as msgThroughputIn,msg_rate_out as msgRateOut," +
+ "msg_throughput_out as msgThroughputOut,average_msg_size as averageMsgSize,storage_size as storageSize," +
+ "time_stamp as timestamp FROM topics_stats " +
"WHERE environment=#{environment} and tenant=#{tenant} and namespace=#{namespace} " +
- "and timestamp=#{timestamp}")
+ "and time_stamp=#{timestamp}")
Page<TopicStatsEntity> findByNamespace(
@Param("environment") String environment,
@Param("tenant") String tenant,
@@ -57,17 +69,17 @@
@Select({"<script>",
"SELECT environment, cluster, tenant, namespace, persistent, topic,"
- + "sum(producerCount) as producerCount,"
- + "sum(subscriptionCount) as subscriptionCount,"
- + "sum(msgRateIn) as msgRateIn,"
- + "sum(msgThroughputIn) as msgThroughputIn,"
- + "sum(msgRateOut) as msgRateOut,"
- + "sum(msgThroughputOut) as msgThroughputOut,"
- + "avg(averageMsgSize) as averageMsgSize,"
- + "sum(storageSize) as storageSize, timestamp FROM topicsStats",
- "WHERE environment=#{environment} and tenant=#{tenant} and namespace=#{namespace} and timestamp=#{timestamp} and " +
+ + "sum(producer_count) as producerCount,"
+ + "sum(subscription_count) as subscriptionCount,"
+ + "sum(msg_rate_in) as msgRateIn,"
+ + "sum(msg_throughput_in) as msgThroughputIn,"
+ + "sum(msg_rate_out) as msgRateOut,"
+ + "sum(msg_throughput_out) as msgThroughputOut,"
+ + "avg(average_msg_size) as averageMsgSize,"
+ + "sum(storage_size) as storageSize, time_stamp as timestamp FROM topics_stats",
+ "WHERE environment=#{environment} and tenant=#{tenant} and namespace=#{namespace} and time_stamp=#{timestamp} and " +
"topic IN <foreach collection='topicList' item='topic' open='(' separator=',' close=')'> #{topic} </foreach>" +
- "GROUP BY cluster, persistent, topic" +
+ "GROUP BY environment, cluster, tenant, namespace, persistent, topic, timestamp" +
"</script>"})
Page<TopicStatsEntity> findByMultiTopic(
@Param("environment") String environment,
@@ -77,6 +89,6 @@
@Param("topicList") List<String> topicList,
@Param("timestamp") long timestamp);
- @Delete("DELETE FROM topicsStats WHERE #{nowTime} - #{timeInterval} >= timestamp")
+ @Delete("DELETE FROM topics_stats WHERE #{nowTime} - #{timeInterval} >= time_stamp")
void delete(@Param("nowTime") long nowTime, @Param("timeInterval") long timeInterval);
}
diff --git a/src/main/resources/META-INF/sql/mysql-schema.sql b/src/main/resources/META-INF/sql/mysql-schema.sql
index 023941d..4e6ccd6 100644
--- a/src/main/resources/META-INF/sql/mysql-schema.sql
+++ b/src/main/resources/META-INF/sql/mysql-schema.sql
@@ -21,10 +21,10 @@
broker varchar(1024) NOT NULL,
CONSTRAINT PK_name PRIMARY KEY (name),
UNIQUE (broker)
-) ENGINE=InnoDB CHARACTER SET utf8;
+)ENGINE=InnoDB CHARACTER SET utf8;
-CREATE TABLE IF NOT EXISTS topicsStats (
- topicStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
+CREATE TABLE IF NOT EXISTS topics_stats (
+ topic_stats_id BIGINT PRIMARY KEY AUTO_INCREMENT,
environment varchar(255) NOT NULL,
cluster varchar(255) NOT NULL,
broker varchar(255) NOT NULL,
@@ -33,85 +33,86 @@
bundle varchar(255) NOT NULL,
persistent varchar(36) NOT NULL,
topic varchar(255) NOT NULL,
- producerCount BIGINT,
- subscriptionCount BIGINT,
- msgRateIn double,
- msgThroughputIn double,
- msgRateOut double,
- msgThroughputOut double,
- averageMsgSize double,
- storageSize double,
- timestamp BIGINT
-) ENGINE=InnoDB CHARACTER SET utf8;
+ producer_count BIGINT,
+ subscription_count BIGINT,
+ msg_rate_in double,
+ msg_throughput_in double,
+ msg_rate_out double,
+ msg_throughput_out double,
+ average_msg_size double,
+ storage_size double,
+ time_stamp BIGINT
+)ENGINE=InnoDB CHARACTER SET utf8;
-CREATE TABLE IF NOT EXISTS publishersStats (
- publisherStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
- producerId BIGINT,
- topicStatsId BIGINT NOT NULL,
- producerName varchar(255) NOT NULL,
- msgRateIn double,
- msgThroughputIn double,
- averageMsgSize double,
+CREATE TABLE IF NOT EXISTS publishers_stats (
+ publisher_stats_id BIGINT PRIMARY KEY AUTO_INCREMENT,
+ producer_id BIGINT,
+ topic_stats_id BIGINT NOT NULL,
+ producer_name varchar(255) NOT NULL,
+ msg_rate_in double,
+ msg_throughput_in double,
+ average_msg_size double,
address varchar(255),
- connectedSince varchar(128),
- clientVersion varchar(36),
+ connected_since varchar(128),
+ client_version varchar(36),
metadata text,
- timestamp BIGINT,
- CONSTRAINT FK_publishers_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
-) ENGINE=InnoDB CHARACTER SET utf8;
+ time_stamp BIGINT,
+ CONSTRAINT FK_publishers_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+)ENGINE=InnoDB CHARACTER SET utf8;
-CREATE TABLE IF NOT EXISTS replicationsStats (
- replicationStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
- topicStatsId BIGINT NOT NULL,
+CREATE TABLE IF NOT EXISTS replications_stats (
+ replication_stats_id BIGINT PRIMARY KEY AUTO_INCREMENT,
+ topic_stats_id BIGINT NOT NULL,
cluster varchar(255) NOT NULL,
connected BOOLEAN,
- msgRateIn double,
- msgRateOut double,
- msgRateExpired double,
- msgThroughputIn double,
- msgThroughputOut double,
- msgRateRedeliver double,
- replicationBacklog BIGINT,
- replicationDelayInSeconds BIGINT,
- inboundConnection varchar(255),
- inboundConnectedSince varchar(255),
- outboundConnection varchar(255),
- outboundConnectedSince varchar(255),
- timestamp BIGINT,
- CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
-) ENGINE=InnoDB CHARACTER SET utf8;
+ msg_rate_in double,
+ msg_rate_out double,
+ msg_rate_expired double,
+ msg_throughput_in double,
+ msg_throughput_out double,
+ msg_rate_redeliver double,
+ replication_backlog BIGINT,
+ replication_delay_in_seconds BIGINT,
+ inbound_connection varchar(255),
+ inbound_connected_since varchar(255),
+ outbound_connection varchar(255),
+ outbound_connected_since varchar(255),
+ time_stamp BIGINT,
+ CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+)ENGINE=InnoDB CHARACTER SET utf8;
-CREATE TABLE IF NOT EXISTS subscriptionsStats (
- subscriptionStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
- topicStatsId BIGINT NOT NULL,
+CREATE TABLE IF NOT EXISTS subscriptions_stats (
+ subscription_stats_id BIGINT PRIMARY KEY AUTO_INCREMENT,
+ topic_stats_id BIGINT NOT NULL,
subscription varchar(255) NULL,
- msgBacklog BIGINT,
- msgRateExpired double,
- msgRateOut double,
- msgThroughputOut double,
- msgRateRedeliver double,
- numberOfEntriesSinceFirstNotAckedMessage BIGINT,
- totalNonContiguousDeletedMessagesRange BIGINT,
- subscriptionType varchar(16),
- blockedSubscriptionOnUnackedMsgs BOOLEAN,
- timestamp BIGINT,
- UNIQUE (topicStatsId, subscription),
- CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
-) ENGINE=InnoDB CHARACTER SET utf8;
+ msg_backlog BIGINT,
+ msg_rate_expired double,
+ msg_rate_out double,
+ msg_throughput_out double,
+ msg_rate_redeliver double,
+ number_of_entries_since_first_not_acked_message BIGINT,
+ total_non_contiguous_deleted_messages_range BIGINT,
+ subscription_type varchar(16),
+ blocked_subscription_on_unacked_msgs BOOLEAN,
+ time_stamp BIGINT,
+ UNIQUE (topic_stats_id, subscription),
+ CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+)ENGINE=InnoDB CHARACTER SET utf8;
-CREATE TABLE IF NOT EXISTS consumersStats (
- consumerStatsId BIGINT PRIMARY KEY AUTO_INCREMENT,
+CREATE TABLE IF NOT EXISTS consumers_stats (
+ consumer_stats_id BIGINT PRIMARY KEY AUTO_INCREMENT,
consumer varchar(255) NOT NULL,
- topicStatsId BIGINT NOT NUll,
- replicationStatsId BIGINT,
- subscriptionStatsId BIGINT,
+ topic_stats_id BIGINT NOT NUll,
+ replication_stats_id BIGINT,
+ subscription_stats_id BIGINT,
address varchar(255),
- availablePermits BIGINT,
- connectedSince varchar(255),
- msgRateOut double,
- msgThroughputOut double,
- msgRateRedeliver double,
- clientVersion varchar(36),
- timestamp BIGINT,
+ available_permits BIGINT,
+ connected_since varchar(255),
+ msg_rate_out double,
+ msg_throughput_out double,
+ msg_rate_redeliver double,
+ client_version varchar(36),
+ time_stamp BIGINT,
metadata text
-) ENGINE=InnoDB CHARACTER SET utf8;
+)ENGINE=InnoDB CHARACTER SET utf8;
+
diff --git a/src/main/resources/META-INF/sql/postgresql-schema.sql b/src/main/resources/META-INF/sql/postgresql-schema.sql
new file mode 100644
index 0000000..8048bb3
--- /dev/null
+++ b/src/main/resources/META-INF/sql/postgresql-schema.sql
@@ -0,0 +1,117 @@
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+CREATE DATABASE pulsar_manager;
+
+\c pulsar_manager;
+
+CREATE TABLE IF NOT EXISTS environments (
+ name varchar(256) NOT NULL,
+ broker varchar(1024) NOT NULL,
+ CONSTRAINT PK_name PRIMARY KEY (name),
+ UNIQUE (broker)
+);
+
+CREATE TABLE IF NOT EXISTS topics_stats (
+ topic_stats_id BIGSERIAL PRIMARY KEY,
+ environment varchar(255) NOT NULL,
+ cluster varchar(255) NOT NULL,
+ broker varchar(255) NOT NULL,
+ tenant varchar(255) NOT NULL,
+ namespace varchar(255) NOT NULL,
+ bundle varchar(255) NOT NULL,
+ persistent varchar(36) NOT NULL,
+ topic varchar(255) NOT NULL,
+ producer_count BIGINT,
+ subscription_count BIGINT,
+ msg_rate_in double precision ,
+ msg_throughput_in double precision ,
+ msg_rate_out double precision ,
+ msg_throughput_out double precision ,
+ average_msg_size double precision ,
+ storage_size double precision ,
+ time_stamp BIGINT
+);
+
+CREATE TABLE IF NOT EXISTS publishers_stats (
+ publisher_stats_id BIGSERIAL PRIMARY KEY,
+ producer_id BIGINT,
+ topic_stats_id BIGINT NOT NULL,
+ producer_name varchar(255) NOT NULL,
+ msg_rate_in double precision ,
+ msg_throughput_in double precision ,
+ average_msg_size double precision ,
+ address varchar(255),
+ connected_since varchar(128),
+ client_version varchar(36),
+ metadata text,
+ time_stamp BIGINT,
+ CONSTRAINT fk_publishers_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+);
+
+CREATE TABLE IF NOT EXISTS replications_stats (
+ replication_stats_id BIGSERIAL PRIMARY KEY,
+ topic_stats_id BIGINT NOT NULL,
+ cluster varchar(255) NOT NULL,
+ connected BOOLEAN,
+ msg_rate_in double precision ,
+ msg_rate_out double precision ,
+ msg_rate_expired double precision ,
+ msg_throughput_in double precision ,
+ msg_throughput_out double precision ,
+ msg_rate_redeliver double precision ,
+ replication_backlog BIGINT,
+ replication_delay_in_seconds BIGINT,
+ inbound_connection varchar(255),
+ inbound_connected_since varchar(255),
+ outbound_connection varchar(255),
+ outbound_connected_since varchar(255),
+ time_stamp BIGINT,
+ CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+);
+
+CREATE TABLE IF NOT EXISTS subscriptions_stats (
+ subscription_stats_id BIGSERIAL PRIMARY KEY,
+ topic_stats_id BIGINT NOT NULL,
+ subscription varchar(255) NULL,
+ msg_backlog BIGINT,
+ msg_rate_expired double precision ,
+ msg_rate_out double precision ,
+ msg_throughput_out double precision ,
+ msg_rate_redeliver double precision ,
+ number_of_entries_since_first_not_acked_message BIGINT,
+ total_non_contiguous_deleted_messages_range BIGINT,
+ subscription_type varchar(16),
+ blocked_subscription_on_unacked_msgs BOOLEAN,
+ time_stamp BIGINT,
+ UNIQUE (topic_stats_id, subscription),
+ CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
+);
+
+CREATE TABLE IF NOT EXISTS consumers_stats (
+ consumer_stats_id BIGSERIAL PRIMARY KEY,
+ consumer varchar(255) NOT NULL,
+ topic_stats_id BIGINT NOT NUll,
+ replication_stats_id BIGINT,
+ subscription_stats_id BIGINT,
+ address varchar(255),
+ available_permits BIGINT,
+ connected_since varchar(255),
+ msg_rate_out double precision ,
+ msg_throughput_out double precision ,
+ msg_rate_redeliver double precision ,
+ client_version varchar(36),
+ time_stamp BIGINT,
+ metadata text
+);
diff --git a/src/main/resources/META-INF/sql/sqlite-schema.sql b/src/main/resources/META-INF/sql/sqlite-schema.sql
index dfc8c7c..674a032 100644
--- a/src/main/resources/META-INF/sql/sqlite-schema.sql
+++ b/src/main/resources/META-INF/sql/sqlite-schema.sql
@@ -19,8 +19,8 @@
UNIQUE (broker)
);
-CREATE TABLE IF NOT EXISTS topicsStats (
- topicStatsId INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics_stats (
+ topic_stats_id INTEGER PRIMARY KEY AUTOINCREMENT,
environment varchar(255) NOT NULL,
cluster varchar(255) NOT NULL,
broker varchar(255) NOT NULL,
@@ -29,85 +29,85 @@
bundle varchar(255) NOT NULL,
persistent varchar(36) NOT NULL,
topic varchar(255) NOT NULL,
- producerCount INTEGER,
- subscriptionCount INTEGER,
- msgRateIn double,
- msgThroughputIn double,
- msgRateOut double,
- msgThroughputOut double,
- averageMsgSize double,
- storageSize double,
- timestamp integer
+ producer_count INTEGER,
+ subscription_count INTEGER,
+ msg_rate_in double,
+ msg_throughput_in double,
+ msg_rate_out double,
+ msg_throughput_out double,
+ average_msg_size double,
+ storage_size double,
+ time_stamp integer
);
-CREATE TABLE IF NOT EXISTS publishersStats (
- publisherStatsId INTEGER PRIMARY KEY AUTOINCREMENT,
- producerId INTEGER,
- topicStatsId INTEGER NOT NULL,
- producerName varchar(255) NOT NULL,
- msgRateIn double,
- msgThroughputIn double,
- averageMsgSize double,
+CREATE TABLE IF NOT EXISTS publishers_stats (
+ publisher_stats_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ producer_id INTEGER,
+ topic_stats_id INTEGER NOT NULL,
+ producer_name varchar(255) NOT NULL,
+ msg_rate_in double,
+ msg_throughput_in double,
+ average_msg_size double,
address varchar(255),
- connectedSince varchar(128),
- clientVersion varchar(36),
+ connected_since varchar(128),
+ client_version varchar(36),
metadata text,
- timestamp integer,
- CONSTRAINT FK_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
+ time_stamp integer,
+ CONSTRAINT FK_publishers_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
);
-CREATE TABLE IF NOT EXISTS replicationsStats (
- replicationStatsId INTEGER PRIMARY KEY AUTOINCREMENT,
- topicStatsId INTEGER NOT NULL,
+CREATE TABLE IF NOT EXISTS replications_stats (
+ replication_stats_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ topic_stats_id INTEGER NOT NULL,
cluster varchar(255) NOT NULL,
connected false,
- msgRateIn double,
- msgRateOut double,
- msgRateExpired double,
- msgThroughputIn double,
- msgThroughputOut double,
- msgRateRedeliver double,
- replicationBacklog INTEGER,
- replicationDelayInSeconds integer,
- inboundConnection varchar(255),
- inboundConnectedSince varchar(255),
- outboundConnection varchar(255),
- outboundConnectedSince varchar(255),
- timestamp integer,
- CONSTRAINT FK_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
+ msg_rate_in double,
+ msg_rate_out double,
+ msg_rate_expired double,
+ msg_throughput_in double,
+ msg_throughput_out double,
+ msg_rate_redeliver double,
+ replication_backlog INTEGER,
+ replication_delay_in_seconds integer,
+ inbound_connection varchar(255),
+ inbound_connected_since varchar(255),
+ outbound_connection varchar(255),
+ outbound_connected_since varchar(255),
+ time_stamp integer,
+ CONSTRAINT FK_replications_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
);
-CREATE TABLE IF NOT EXISTS subscriptionsStats (
- subscriptionStatsId INTEGER PRIMARY KEY AUTOINCREMENT,
- topicStatsId INTEGER NOT NULL,
+CREATE TABLE IF NOT EXISTS subscriptions_stats (
+ subscription_stats_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ topic_stats_id INTEGER NOT NULL,
subscription varchar(255) NULL,
- msgBacklog integer,
- msgRateExpired double,
- msgRateOut double,
- msgThroughputOut double,
- msgRateRedeliver double,
- numberOfEntriesSinceFirstNotAckedMessage integer,
- totalNonContiguousDeletedMessagesRange integer,
- subscriptionType varchar(16),
- blockedSubscriptionOnUnackedMsgs false,
- timestamp integer,
- UNIQUE (topicStatsId, subscription),
- CONSTRAINT FK_topic_stats_id FOREIGN KEY (topicStatsId) References topicsStats(topicStatsId)
+ msg_backlog integer,
+ msg_rate_expired double,
+ msg_rate_out double,
+ msg_throughput_out double,
+ msg_rate_redeliver double,
+ number_of_entries_since_first_not_acked_message integer,
+ total_non_contiguous_deleted_messages_range integer,
+ subscription_type varchar(16),
+ blocked_subscription_on_unacked_msgs false,
+ time_stamp integer,
+ UNIQUE (topic_stats_id, subscription),
+ CONSTRAINT FK_subscriptions_stats_topic_stats_id FOREIGN KEY (topic_stats_id) References topics_stats(topic_stats_id)
);
-CREATE TABLE IF NOT EXISTS consumersStats (
- consumerStatsId INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS consumers_stats (
+ consumer_stats_id INTEGER PRIMARY KEY AUTOINCREMENT,
consumer varchar(255) NOT NULL,
- topicStatsId INTEGER NOT NUll,
- replicationStatsId integer,
- subscriptionStatsId integer,
+ topic_stats_id INTEGER NOT NUll,
+ replication_stats_id integer,
+ subscription_stats_id integer,
address varchar(255),
- availablePermits integer,
- connectedSince varchar(255),
- msgRateOut double,
- msgThroughputOut double,
- msgRateRedeliver double,
- clientVersion varchar(36),
- timestamp integer,
+ available_permits integer,
+ connected_since varchar(255),
+ msg_rate_out double,
+ msg_throughput_out double,
+ msg_rate_redeliver double,
+ client_version varchar(36),
+ time_stamp integer,
metadata text
);
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 173d2ea..76dc60f 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -15,6 +15,10 @@
spring.cloud.refresh.refreshable=none
server.port=7750
+# configuration log
+logging.path=
+logging.file=pulsar-manager.log
+
mybatis.type-aliases-package=io.streamnative.pulsar.manager
# database connection
@@ -24,10 +28,12 @@
spring.datasource.schema=classpath:/META-INF/sql/sqlite-schema.sql
spring.datasource.username=
spring.datasource.password=
-spring.datasource.max-idle=10
-spring.datasource.max-wait=10000
-spring.datasource.min-idle=5
-spring.datasource.initial-size=5
+
+# postgresql configuration
+#spring.datasource.driver-class-name=org.postgresql.Driver
+#spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/pulsar_manager
+#spring.datasource.username=postgres
+#spring.datasource.password=postgres
# zuul config
zuul.routes.admin.path=/admin/**