Support management of jwt for pulsar-manager (#205)

* ### Modifications
Support JWT management

### Modifications
* Support management token, add, delete, view
* Support for namespace authorization
* support for topic and partition topic authorization

diff --git a/.gitignore b/.gitignore
index 5b37fc9..df371de 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,6 +44,8 @@
 *.db
 front-end/data/
 *.log
+*.gz
 dbdata
 src/main/resources/local.properties
 
+*.key
\ No newline at end of file
diff --git a/README.md b/README.md
index f959ec5..019f28d 100644
--- a/README.md
+++ b/README.md
@@ -13,6 +13,7 @@
 * Brokers Management
 * Clusters Management
 * Dynamic environments with multiple changes
+* Support JWT Auth
 
 ## Feature preview
 
@@ -60,6 +61,10 @@
 
 ![pulsar-manager-topics-monitors](docs/img/pulsar-manager-topics-monitors.gif)
 
+### Manage token
+
+![pulsar-manager-token](docs/img/pulsar-manager-token.gif)
+
 
 ## Prerequisites
 * Java 8 or later
diff --git a/build.gradle b/build.gradle
index 81a6699..cee3a25 100644
--- a/build.gradle
+++ b/build.gradle
@@ -86,7 +86,9 @@
     compile group: 'org.postgresql', name: 'postgresql', version: postgresqlVersion
     compile group: 'org.herddb', name: 'herddb-jdbc', version: herddbVersion
     compile group: 'javax.validation', name: 'validation-api', version: javaxValidationVersion
-    compile group: 'io.jsonwebtoken', name: 'jjwt', version: jsonWebTokenVersion
+    compile group: 'io.jsonwebtoken', name: 'jjwt-api', version: jsonWebTokenApiVersion
+    compile group: 'io.jsonwebtoken', name: 'jjwt-impl', version: jsonWebTokenImplVersion
+    compile group: 'io.jsonwebtoken', name: 'jjwt-jackson', version: jsonWebTokenImplVersion
     compile group: 'org.xerial', name: 'sqlite-jdbc', version: sqliteVersion
     compile group: 'com.github.pagehelper', name: 'pagehelper-spring-boot-starter', version: pageHelperVersion
     compile group: 'org.mockito', name: 'mockito-core', version: mockitoVersion
@@ -97,5 +99,6 @@
     compile group: 'io.springfox', name: 'springfox-swagger-ui', version: swaggeruiVersion
     compile group: 'org.powermock', name: 'powermock-api-mockito', version: apiMockitoVersion
     compile group: 'org.powermock', name: 'powermock-module-junit4', version: mockitoJunit4Version
+    compile group: 'org.apache.pulsar', name: 'pulsar-broker', version: brokerVersion
     compileOnly group: 'org.projectlombok', name: 'lombok', version: lombokVersion
 }
diff --git a/docker/init_db.sql b/docker/init_db.sql
index ce02ea4..9f3459b 100644
--- a/docker/init_db.sql
+++ b/docker/init_db.sql
@@ -115,4 +115,12 @@
   client_version varchar(36),
   time_stamp BIGINT,
   metadata text
+);
+
+CREATE TABLE IF NOT EXISTS tokens (
+  token_id BIGSERIAL PRIMARY KEY,
+  role varchar(256) NOT NULL,
+  description varchar(128),
+  token varchar(1024) NOT NUll,
+  UNIQUE (role)
 );
\ No newline at end of file
diff --git a/docs/img/pulsar-manager-token.gif b/docs/img/pulsar-manager-token.gif
new file mode 100644
index 0000000..10d71ac
--- /dev/null
+++ b/docs/img/pulsar-manager-token.gif
Binary files differ
diff --git a/front-end/src/api/tokens.js b/front-end/src/api/tokens.js
new file mode 100644
index 0000000..4844cf2
--- /dev/null
+++ b/front-end/src/api/tokens.js
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import request from '@/utils/request'
+
+const SPRING_BASE_URL = '/pulsar-manager'
+
+export function fetchTokens(query) {
+  return request({
+    url: SPRING_BASE_URL + '/tokens',
+    method: 'get',
+    params: { query }
+  })
+}
+
+export function putToken(data) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + '/tokens/token',
+    method: 'put',
+    data
+  })
+}
+
+export function updateToken(data) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + '/tokens/token',
+    method: 'post',
+    data
+  })
+}
+
+export function deleteToken(role) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + `/tokens/${role}`,
+    method: 'delete'
+  })
+}
+
+export function getToken(role) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + `/tokens/${role}`,
+    method: 'get'
+  })
+}
diff --git a/front-end/src/api/topics.js b/front-end/src/api/topics.js
index 0d69feb..daf9946 100644
--- a/front-end/src/api/topics.js
+++ b/front-end/src/api/topics.js
@@ -163,6 +163,40 @@
   })
 }
 
+export function getPermissionsOnCluster(cluster, persistent, tenantNamespaceTopic) {
+  return request({
+    headers: {
+      'Content-Type': 'application/json',
+      'x-pulsar-cluster': cluster
+    },
+    url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/permissions`,
+    method: 'get'
+  })
+}
+
+export function grantPermissionsOnCluster(cluster, persistent, tenantNamespaceTopic, role, data) {
+  return request({
+    headers: {
+      'Content-Type': 'application/json',
+      'x-pulsar-cluster': cluster
+    },
+    url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/permissions/${role}`,
+    method: 'post',
+    data
+  })
+}
+
+export function revokePermissionsOnCluster(cluster, persistent, tenantNamespaceTopic, role) {
+  return request({
+    headers: {
+      'Content-Type': 'application/json',
+      'x-pulsar-cluster': cluster
+    },
+    url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/permissions/${role}`,
+    method: 'delete'
+  })
+}
+
 export function getPermissions(tenantNamespaceTopic) {
   return request({
     url: BASE_URL_V2 + `/persistent/${tenantNamespaceTopic}/permissions`,
diff --git a/front-end/src/lang/en.js b/front-end/src/lang/en.js
index 4551347..3812850 100644
--- a/front-end/src/lang/en.js
+++ b/front-end/src/lang/en.js
@@ -484,6 +484,7 @@
       updateReplicatedClustersSuccess: 'Successfully update replicated clusters for this namespace!',
       updateSubscriptionAuthModeSuccess: 'Successfully update subscription authentication mode for this namespace!',
       addRoleSuccess: 'Successfully add a role for this namespace!',
+      addRoleActionsSuccess: 'Successfully add a role action for this namespace!',
       removeRoleSuccess: 'Successfully remove a role from this namespace!',
       updateSchemaAutoUpdateStrategySuccess: 'Successfully change schema auto-update strategy for this namespace!',
       enableSchemaValidationEnforcedSuccess: 'Successfully enable schema validation for this namespace!',
@@ -746,5 +747,18 @@
     deleteEnvSuccessNotification: 'Successfully delete an environment',
     updateEnvSuccessNotification: 'Successfully update an environment',
     manageEnvs: 'Manage Environments'
+  },
+  token: {
+    buttonNewToken: 'New Token',
+    colHeadingRole: 'Role',
+    colHeadingToken: 'Token',
+    colHeadingDesc: 'Description',
+    newTokenRolePlaceholder: 'Please input role name',
+    newTokenDialogCaption: 'New Token',
+    newTokenDescPlaceholder: 'Please input description for role',
+    newTokenRequiredMessage: 'Role is required',
+    addTokenSuccessNotification: 'Successfully add a token for role',
+    deleteTokenSuccessNotification: 'Successfully delete a token for role',
+    updateTokenSucccessNotification: 'Successfully update a token for role'
   }
 }
diff --git a/front-end/src/lang/zh.js b/front-end/src/lang/zh.js
index 3ccbaa3..c7c15d7 100644
--- a/front-end/src/lang/zh.js
+++ b/front-end/src/lang/zh.js
@@ -484,6 +484,7 @@
       updateReplicatedClustersSuccess: 'Successfully update replicated clusters for this namespace!',
       updateSubscriptionAuthModeSuccess: 'Successfully update subscription authentication mode for this namespace!',
       addRoleSuccess: 'Successfully add a role for this namespace!',
+      addRoleActionsSuccess: 'Successfully add a role action for this namespace!',
       removeRoleSuccess: 'Successfully remove a role from this namespace!',
       updateSchemaAutoUpdateStrategySuccess: 'Successfully change schema auto-update strategy for this namespace!',
       enableSchemaValidationEnforcedSuccess: 'Successfully enable schema validation for this namespace!',
@@ -744,5 +745,18 @@
     deleteEnvSuccessNotification: 'Successfully delete an environment',
     updateEnvSuccessNotification: 'Successfully update an environment',
     manageEnvs: 'Manage Environments'
+  },
+  token: {
+    buttonNewToken: 'New Token',
+    colHeadingRole: 'Role',
+    colHeadingToken: 'Token',
+    colHeadingDesc: 'Description',
+    newTokenRolePlaceholder: 'Please input role name',
+    newTokenDialogCaption: 'New Token',
+    newTokenDescPlaceholder: 'Please input description for role',
+    newTokenRequiredMessage: 'Role is required',
+    addTokenSuccessNotification: 'Successfully add a token for role',
+    deleteTokenSuccessNotification: 'Successfully delete a token for role',
+    updateTokenSucccessNotification: 'Successfully update a token for role'
   }
 }
diff --git a/front-end/src/router/index.js b/front-end/src/router/index.js
index 78967b0..4b27788 100644
--- a/front-end/src/router/index.js
+++ b/front-end/src/router/index.js
@@ -226,6 +226,13 @@
         name: 'Bookies',
         meta: { title: 'Bookies', noCache: true },
         hidden: true
+      },
+      {
+        path: '/tokens',
+        component: () => import('@/views/management/tokens/index'),
+        name: 'Tokens',
+        meta: { title: 'Tokens', noCache: true },
+        hidden: false
       }
     ]
   }
diff --git a/front-end/src/views/management/namespaces/namespace.vue b/front-end/src/views/management/namespaces/namespace.vue
index eb640cc..67fc706 100644
--- a/front-end/src/views/management/namespaces/namespace.vue
+++ b/front-end/src/views/management/namespaces/namespace.vue
@@ -185,7 +185,7 @@
               :placeholder="$t('namespace.policy.selectRole')"
               multiple
               style="width:300px;"
-              @change="handleChangeOptions()">
+              @change="handleChangeOptions(tag)">
               <el-option
                 v-for="item in roleMapOptions[tag]"
                 :key="item.value"
@@ -1090,7 +1090,8 @@
       getPermissions(tenantNamespace).then(response => {
         if (!response.data) return
         for (var key in response.data) {
-          this.roleMap[key] = response.data.key
+          this.dynamicTags.push(key)
+          this.roleMap[key] = response.data[key]
           this.roleMapOptions[key] = this.roleOptions
         }
       })
@@ -1255,7 +1256,15 @@
       this.inputVisible = false
       this.inputValue = ''
     },
-    handleChangeOptions() {
+    handleChangeOptions(role) {
+      grantPermissions(this.tenantNamespace, role, this.roleMap[role]).then(response => {
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('namespace.notification.addRoleActionsSuccess'),
+          type: 'success',
+          duration: 3000
+        })
+      })
       this.$forceUpdate()
     },
     revokeAllRole() {
diff --git a/front-end/src/views/management/tokens/index.vue b/front-end/src/views/management/tokens/index.vue
new file mode 100644
index 0000000..7d6ee16
--- /dev/null
+++ b/front-end/src/views/management/tokens/index.vue
@@ -0,0 +1,258 @@
+<template>
+  <div class="app-container">
+    <el-button type="primary" icon="el-icon-plus" @click="handleCreateToken">{{ $t('token.buttonNewToken') }}</el-button>
+
+    <el-row :gutter="24">
+      <el-col :xs="{span: 24}" :sm="{span: 24}" :md="{span: 24}" :lg="{span: 24}" :xl="{span: 24}" style="margin-top:15px">
+        <el-table
+          v-loading="tokenListLoading"
+          :key="tokenTableKey"
+          :data="tokenList"
+          border
+          fit
+          highlight-current-row
+          style="width: 100%;">
+          <el-table-column :label="$t('token.colHeadingRole')" min-width="50px" align="center">
+            <template slot-scope="scope">
+              <span>{{ scope.row.role }}</span>
+            </template>
+          </el-table-column>
+          <el-table-column :label="$t('token.colHeadingToken')" align="center" min-width="100px">
+            <template slot-scope="scope">
+              <el-link @click="handleGetToken(scope.row)">{{ scope.row.token }}<i class="el-icon-view el-icon--right"/></el-link>
+            </template>
+          </el-table-column>
+          <el-table-column :label="$t('token.colHeadingDesc')" align="center" min-width="100px">
+            <template slot-scope="scope">
+              <span>{{ scope.row.description }}</span>
+            </template>
+          </el-table-column>
+          <el-table-column :label="$t('table.actions')" align="center" class-name="small-padding fixed-width">
+            <template slot-scope="scope">
+              <el-button type="primary" size="mini" @click="handleUpdateToken(scope.row)">{{ $t('table.edit') }}</el-button>
+              <el-button size="mini" type="danger" @click="handleDeleteToken(scope.row)">{{ $t('table.delete') }}</el-button>
+            </template>
+          </el-table-column>
+        </el-table>
+      </el-col>
+    </el-row>
+
+    <el-dialog :title="textMap[dialogStatus]" :visible.sync="dialogFormVisible" width="30%">
+      <el-form ref="form" :rules="rules" :model="form" label-position="top">
+        <el-form-item v-if="dialogStatus==='create'" :label="$t('token.colHeadingRole')" prop="role">
+          <el-input v-model="form.role" :placeholder="$t('token.newTokenRolePlaceholder')"/>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='create'" :label="$t('token.colHeadingDesc')">
+          <el-input :rows="2" v-model="form.description" :placeholder="$t('token.newTokenDescPlaceholder')" type="textarea"/>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='update'" :label="$t('token.colHeadingRole')">
+          <el-tag type="primary" size="medium">{{ form.role }}</el-tag>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='update'" :label="$t('token.colHeadingDesc')">
+          <el-input v-model="form.description" :placeholder="$t('token.newTokenDescPlaceholder')" type="textarea"/>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='delete'">
+          <h4>Delete Role {{ form.role }}</h4>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='get'" :label="$t('token.colHeadingRole')">
+          <el-tag type="primary" size="medium">{{ form.role }}</el-tag>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='get'" :label="$t('token.colHeadingToken')">
+          <span type="primary" size="medium">{{ form.token }}</span>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='get'" :label="$t('token.colHeadingDesc')">
+          <span>{{ form.description }}</span>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus!=='get'">
+          <el-button type="primary" @click="handleOptions()">{{ $t('table.confirm') }}</el-button>
+          <el-button @click="dialogFormVisible = false">{{ $t('table.cancel') }}</el-button>
+        </el-form-item>
+      </el-form>
+    </el-dialog>
+  </div>
+</template>
+
+<script>
+import { putToken, fetchTokens, updateToken, deleteToken, getToken } from '@/api/tokens'
+
+export default {
+  name: 'TokensInfo',
+  data() {
+    return {
+      tokenList: [],
+      tokenTableKey: 0,
+      tokenListLoading: false,
+      textMap: {
+        create: 'New Token',
+        delete: 'Delete Token',
+        update: 'Update Token'
+      },
+      dialogFormVisible: false,
+      dialogStatus: '',
+      form: {
+        token: '',
+        role: '',
+        description: ''
+      },
+      temp: {
+        'token': '',
+        'role': '',
+        'description': ''
+      },
+      description: '',
+      rules: {
+        token: [{ required: true, message: this.$i18n.t('token.newTokenRequiredMessage'), trigger: 'blur' }]
+      }
+    }
+  },
+  created() {
+    this.getTokens()
+  },
+  methods: {
+    getTokens() {
+      fetchTokens().then(response => {
+        if (!response.data) return
+        this.tokenList = []
+        for (var i = 0; i < response.data.data.length; i++) {
+          this.tokenList.push({
+            'token': 'view',
+            'role': response.data.data[i].role,
+            'description': response.data.data[i].description
+          })
+        }
+      })
+    },
+    handleCreateToken() {
+      this.form.token = ''
+      this.form.role = ''
+      this.form.description = ''
+      this.dialogFormVisible = true
+      this.dialogStatus = 'create'
+    },
+    handleDeleteToken(row) {
+      this.temp.token = row.token
+      this.temp.role = row.role
+      this.temp.description = row.description
+      this.dialogFormVisible = true
+      this.dialogStatus = 'delete'
+    },
+    handleUpdateToken(row) {
+      this.form.role = row.role
+      this.form.description = row.description
+      this.dialogFormVisible = true
+      this.dialogStatus = 'update'
+    },
+    handleOptions() {
+      this.$refs['form'].validate((valid) => {
+        if (valid) {
+          switch (this.dialogStatus) {
+            case 'create':
+              this.createToken()
+              break
+            case 'delete':
+              this.deleteToken()
+              break
+            case 'update':
+              this.updateToken()
+              break
+          }
+        }
+      })
+    },
+    createToken() {
+      const data = {
+        'role': this.form.role,
+        'token': '',
+        'description': this.form.description
+      }
+      putToken(data).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('token.addTokenSuccessNotification'),
+          type: 'success',
+          duration: 2000
+        })
+        this.dialogFormVisible = false
+        this.getTokens()
+      })
+    },
+    deleteToken() {
+      deleteToken(this.temp.role).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('token.deleteTokenSuccessNotification'),
+          type: 'success',
+          duration: 2000
+        })
+        this.getTokens()
+        this.dialogFormVisible = false
+      })
+    },
+    updateToken() {
+      const data = {
+        'role': this.form.role,
+        'description': this.form.description
+      }
+      updateToken(data).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('token.updateTokenSucccessNotification'),
+          type: 'success',
+          duration: 2000
+        })
+        this.getTokens()
+        this.dialogFormVisible = false
+      })
+    },
+    handleGetToken(row) {
+      getToken(row.role).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.form.token = response.data.token
+        this.form.role = response.data.role
+        this.form.description = response.data.description
+        this.dialogFormVisible = true
+        this.dialogStatus = 'get'
+      })
+    }
+  }
+}
+</script>
diff --git a/front-end/src/views/management/topics/partitionedTopic.vue b/front-end/src/views/management/topics/partitionedTopic.vue
index c86e9fa..70d786c 100644
--- a/front-end/src/views/management/topics/partitionedTopic.vue
+++ b/front-end/src/views/management/topics/partitionedTopic.vue
@@ -201,7 +201,7 @@
               :placeholder="$t('topic.selectRoleMessage')"
               multiple
               style="width:300px;"
-              @change="handleChangeOptions()">
+              @change="handleChangeOptions(tag)">
               <el-option
                 v-for="item in roleMapOptions[tag]"
                 :key="item.value"
@@ -270,7 +270,10 @@
   deletePartitionTopicOnCluster,
   expireMessagesAllSubscriptionsOnCluster,
   resetCursorByTimestampOnCluster,
-  clearBacklogOnCluster
+  clearBacklogOnCluster,
+  getPermissionsOnCluster,
+  grantPermissionsOnCluster,
+  revokePermissionsOnCluster
 } from '@/api/topics'
 import { fetchTopicsByPulsarManager } from '@/api/topics'
 import Pagination from '@/components/Pagination' // Secondary package based on el-pagination
@@ -369,6 +372,7 @@
     this.getTopicsList()
     this.getReplicatedClusters()
     this.initTopicStats()
+    this.initPermissions()
   },
   methods: {
     getRemoteTenantsList() {
@@ -505,6 +509,16 @@
     getCurrentCluster() {
       return this.clusterForm.cluster || ''
     },
+    initPermissions() {
+      getPermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.tenantNamespaceTopic).then(response => {
+        if (!response.data) return
+        for (var key in response.data) {
+          this.dynamicTags.push(key)
+          this.roleMap[key] = response.data[key]
+          this.roleMapOptions[key] = this.roleOptions
+        }
+      })
+    },
     handleClick(tab, event) {
       this.currentTabName = tab.name
       this.$router.push({ query: {
@@ -514,6 +528,16 @@
     },
     handleClose(tag) {
       this.dynamicTags.splice(this.dynamicTags.indexOf(tag), 1)
+      revokePermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.tenantNamespaceTopic, tag).then(response => {
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('namespace.notification.removeRoleSuccess'),
+          type: 'success',
+          duration: 3000
+        })
+        delete this.roleMap[tag]
+        delete this.roleMapOptions[tag]
+      })
     },
     showInput() {
       this.inputVisible = true
@@ -533,22 +557,30 @@
           this.inputValue = ''
           return
         }
-        // grantPermissions(this.currentNamespace, inputValue, this.roleMap[inputValue]).then(response => {
-        //   this.$notify({
-        //     title: 'success',
-        //     message: 'Add success',
-        //     type: 'success',
-        //     duration: 3000
-        //   })
-        //   this.dynamicTags.push(inputValue)
-        //   this.roleMap[inputValue] = []
-        //   this.roleMapOptions[inputValue] = this.roleOptions
-        // })
+        grantPermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.tenantNamespaceTopic, inputValue, this.roleMap[inputValue]).then(response => {
+          this.$notify({
+            title: 'success',
+            message: this.$i18n.t('namespace.notification.addRoleSuccess'),
+            type: 'success',
+            duration: 3000
+          })
+          this.dynamicTags.push(inputValue)
+          this.roleMap[inputValue] = []
+          this.roleMapOptions[inputValue] = this.roleOptions
+        })
       }
       this.inputVisible = false
       this.inputValue = ''
     },
-    handleChangeOptions() {
+    handleChangeOptions(role) {
+      grantPermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.tenantNamespaceTopic, role, this.roleMap[role]).then(response => {
+        this.$notify({
+          title: 'success',
+          message: 'Set permissions success',
+          type: 'success',
+          duration: 3000
+        })
+      })
       this.$forceUpdate()
     },
     handleDeletePartitionTopic() {
@@ -646,6 +678,12 @@
 </script>
 
 <style>
+.role-el-tag {
+  background-color: #fff !important;
+  border: none !important;
+  font-size: 16px !important;
+  color: black !important;
+}
 .split-line {
   background: #e6e9f3;
   border: none;
diff --git a/front-end/src/views/management/topics/topic.vue b/front-end/src/views/management/topics/topic.vue
index 269d375..289ba59 100644
--- a/front-end/src/views/management/topics/topic.vue
+++ b/front-end/src/views/management/topics/topic.vue
@@ -452,7 +452,7 @@
               multiple
               placeholder="Please Select Options"
               style="width:300px;"
-              @change="handleChangeOptions()">
+              @change="handleChangeOptions(tag)">
               <el-option
                 v-for="item in roleMapOptions[tag]"
                 :key="item.value"
@@ -511,7 +511,10 @@
   compactionStatusOnCluster,
   offloadOnCluster,
   offloadStatusOnCluster,
-  deleteTopicOnCluster
+  deleteTopicOnCluster,
+  getPermissionsOnCluster,
+  grantPermissionsOnCluster,
+  revokePermissionsOnCluster
 } from '@/api/topics'
 import Pagination from '@/components/Pagination' // Secondary package based on el-pagination
 import { formatBytes } from '@/utils/index'
@@ -654,6 +657,7 @@
       this.getOffloadStatus()
     }
     this.loaded = true
+    this.initPermissions()
   },
   methods: {
     onClusterChanged() {
@@ -896,8 +900,28 @@
         this.initTerminateAndSegments()
       })
     },
+    initPermissions() {
+      getPermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.getFullTopic()).then(response => {
+        if (!response.data) return
+        for (var key in response.data) {
+          this.dynamicTags.push(key)
+          this.roleMap[key] = response.data[key]
+          this.roleMapOptions[key] = this.roleOptions
+        }
+      })
+    },
     handleClose(tag) {
       this.dynamicTags.splice(this.dynamicTags.indexOf(tag), 1)
+      revokePermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.getFullTopic(), tag).then(response => {
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('namespace.notification.removeRoleSuccess'),
+          type: 'success',
+          duration: 3000
+        })
+        delete this.roleMap[tag]
+        delete this.roleMapOptions[tag]
+      })
     },
     showInput() {
       this.inputVisible = true
@@ -917,22 +941,30 @@
           this.inputValue = ''
           return
         }
-        // grantPermissions(this.currentNamespace, inputValue, this.roleMap[inputValue]).then(response => {
-        //   this.$notify({
-        //     title: 'success',
-        //     message: 'Add success',
-        //     type: 'success',
-        //     duration: 3000
-        //   })
-        //   this.dynamicTags.push(inputValue)
-        //   this.roleMap[inputValue] = []
-        //   this.roleMapOptions[inputValue] = this.roleOptions
-        // })
+        grantPermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.getFullTopic(), inputValue, this.roleMap[inputValue]).then(response => {
+          this.$notify({
+            title: 'success',
+            message: this.$i18n.t('namespace.notification.addRoleSuccess'),
+            type: 'success',
+            duration: 3000
+          })
+          this.dynamicTags.push(inputValue)
+          this.roleMap[inputValue] = []
+          this.roleMapOptions[inputValue] = this.roleOptions
+        })
       }
       this.inputVisible = false
       this.inputValue = ''
     },
-    handleChangeOptions() {
+    handleChangeOptions(role) {
+      grantPermissionsOnCluster(this.getCurrentCluster(), this.postForm.persistent, this.getFullTopic(), role, this.roleMap[role]).then(response => {
+        this.$notify({
+          title: 'success',
+          message: this.$i18n.t('namespace.notification.addRoleActionsSuccess'),
+          type: 'success',
+          duration: 3000
+        })
+      })
       this.$forceUpdate()
     },
     getFullTopic() {
@@ -984,6 +1016,12 @@
 </script>
 
 <style>
+.role-el-tag {
+  background-color: #fff !important;
+  border: none !important;
+  font-size: 16px !important;
+  color: black !important;
+}
 .split-line {
   background: #e6e9f3;
   border: none;
diff --git a/gradle.properties b/gradle.properties
index 5e3de66..2c8f261 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -2,6 +2,8 @@
 springMybatisVersion=1.3.2
 javaxValidationVersion=2.0.0.Final
 jsonWebTokenVersion=0.9.0
+jsonWebTokenApiVersion=0.10.5
+jsonWebTokenImplVersion=0.10.5
 sqliteVersion=3.21.0.1
 lombokVersion=1.18.10
 pageHelperVersion=1.2.4
@@ -15,3 +17,4 @@
 gsonVersion=2.8.2
 postgresqlVersion=42.2.5
 herddbVersion=0.12.2
+brokerVersion=2.4.1
diff --git a/src/README.md b/src/README.md
index ac4357e..4700e40 100644
--- a/src/README.md
+++ b/src/README.md
@@ -44,3 +44,35 @@
 ```
 ./gradlew -x build -x test
 ```
+
+### Enable JWT Auth
+
+If you want to turn on JWT authentication, configure the following parameters:
+
+* backend.jwt.token  token for the superuser. You need to configure this parameter during cluster initialization.
+* jwt.broker.token.mode  Two modes of generating token, SECRET and PRIVATE.
+* jwt.broker.public.key Configure this option if you are using the PRIVATE mode.
+* jwt.broker.private.key Configure this option if you are using the PRIVATE mode.
+* jwt.broker.secret.key Configure this option if you are using the SECRET mode.
+
+For more information, see [Apache Pulsar](http://pulsar.apache.org/docs/en/security-token-admin/)
+
+* Method 1: Use command-line tool
+
+```
+java -jar ./build/libs/pulsar-manager.jar --redirect.host=http://localhost --redirect.port=9527 insert.stats.interval=600000 --backend.jwt.token=token --jwt.broker.token.mode=PRIVATE --jwt.broker.private.key=file:///path/broker-private.key --jwt.broker.public.key=file:///path/broker-public.key
+```
+
+* Method 2. Configure the application.properties file
+
+```
+backend.jwt.token=token
+
+jwt.broker.token.mode=PRIVATE
+jwt.broker.public.key=file:///path/broker-public.key
+jwt.broker.private.key=file:///path/broker-private.key
+
+or 
+jwt.broker.token.mode=SECRET
+jwt.broker.secret.key=file:///path/broker-secret.key
+```
\ No newline at end of file
diff --git a/src/main/java/io/streamnative/pulsar/manager/controller/BrokerTokensController.java b/src/main/java/io/streamnative/pulsar/manager/controller/BrokerTokensController.java
new file mode 100644
index 0000000..6af6890
--- /dev/null
+++ b/src/main/java/io/streamnative/pulsar/manager/controller/BrokerTokensController.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.controller;
+
+import com.github.pagehelper.Page;
+import com.google.common.collect.Maps;
+import io.streamnative.pulsar.manager.entity.BrokerTokenEntity;
+import io.streamnative.pulsar.manager.entity.BrokerTokensRepository;
+import io.streamnative.pulsar.manager.service.JwtService;
+import io.swagger.annotations.*;
+import org.hibernate.validator.constraints.Range;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
+
+import javax.validation.constraints.Min;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Broker tokens controller for broker token create, delete, update and search
+ */
+@RequestMapping(value = "/pulsar-manager")
+@Api(description = "Support more management token.")
+@Validated
+@RestController
+public class BrokerTokensController {
+
+    @Autowired
+    private JwtService jwtService;
+
+    @Autowired
+    private BrokerTokensRepository brokerTokensRepository;
+
+    @ApiOperation(value = "Get the list of existing broker tokens, support paging, the default is 10 per page")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/tokens", method =  RequestMethod.GET)
+    public ResponseEntity<Map<String, Object>> getEnvironmentsList(
+            @ApiParam(value = "page_num", defaultValue = "1", example = "1")
+            @RequestParam(name = "page_num", defaultValue = "1")
+            @Min(value = 1, message = "page_num is incorrect, should be greater than 0.")
+                    Integer pageNum,
+            @ApiParam(value = "page_size", defaultValue = "10", example = "10")
+            @RequestParam(name="page_size", defaultValue = "10")
+            @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
+                    Integer pageSize) {
+        Page<BrokerTokenEntity> brokerTokenEntityPage = brokerTokensRepository.getBrokerTokensList(pageNum, pageSize);
+        Map<String, Object> result = Maps.newHashMap();
+        result.put("total", brokerTokenEntityPage.getTotal());
+        result.put("data", brokerTokenEntityPage);
+        return ResponseEntity.ok(result);
+    }
+
+    @ApiOperation(value = "Add token for connect broker")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/tokens/token", method = RequestMethod.PUT)
+    public ResponseEntity<Map<String, Object>> addBrokerToken(@RequestBody BrokerTokenEntity brokerTokenEntity) {
+        Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole());
+        Map<String, Object> result = Maps.newHashMap();
+        if (optionalBrokerTokenEntity.isPresent()) {
+            result.put("error", "Role is exist");
+            return ResponseEntity.ok(result);
+        }
+        String token = jwtService.createBrokerToken(brokerTokenEntity.getRole(), null);
+        if (token == null) {
+            result.put("error", "Token gennerate failed");
+            return ResponseEntity.ok(result);
+        }
+        brokerTokenEntity.setToken(token);
+        long brokerTokenId = brokerTokensRepository.save(brokerTokenEntity);
+        if (brokerTokenId <= 0) {
+            result.put("error", "Token save Failed");
+            return ResponseEntity.ok(result);
+        }
+        result.put("message", "Token generate and save success");
+        result.put("tokenId", brokerTokenId);
+        result.put("token", token);
+        return ResponseEntity.ok(result);
+    }
+
+
+    @ApiOperation(value = "Update token for connect broker")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/tokens/token", method = RequestMethod.POST)
+    public ResponseEntity<Map<String, Object>> updateBrokerToken(@RequestBody BrokerTokenEntity brokerTokenEntity) {
+        Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole());
+        Map<String, Object> result = Maps.newHashMap();
+        if (!optionalBrokerTokenEntity.isPresent()) {
+            result.put("error", "Role is not exist");
+            return ResponseEntity.ok(result);
+        }
+        BrokerTokenEntity getBrokerTokenEntity = optionalBrokerTokenEntity.get();
+        brokerTokenEntity.setToken(getBrokerTokenEntity.getToken());
+        brokerTokensRepository.update(brokerTokenEntity);
+        result.put("message", "Token generate and save success");
+        return ResponseEntity.ok(result);
+    }
+
+    @ApiOperation(value = "Get token by role")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/tokens/{role}", method = RequestMethod.GET)
+    public ResponseEntity<Map<String, Object>> getBrokerToken(@PathVariable String role) {
+        Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(role);
+        Map<String, Object> result = Maps.newHashMap();
+        if (!optionalBrokerTokenEntity.isPresent()) {
+            result.put("error", "Token not find");
+            return ResponseEntity.ok(result);
+        }
+        BrokerTokenEntity brokerTokenEntity = optionalBrokerTokenEntity.get();
+        result.put("token", brokerTokenEntity.getToken());
+        result.put("description", brokerTokenEntity.getDescription());
+        result.put("role", brokerTokenEntity.getRole());
+        result.put("tokenId", brokerTokenEntity.getTokenId());
+        return ResponseEntity.ok(result);
+    }
+
+    @ApiOperation(value = "Delete token")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/tokens/{role}", method = RequestMethod.DELETE)
+    public ResponseEntity<Map<String, Object>> deleteBrokerToken(@PathVariable String role) {
+        Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(role);
+        Map<String, Object> result = Maps.newHashMap();
+        if (!optionalBrokerTokenEntity.isPresent()) {
+            result.put("error", "Token not find");
+            return ResponseEntity.ok(result);
+        }
+        brokerTokensRepository.remove(role);
+        result.put("message", "Delete broker token success");
+        result.put("role", role);
+        return ResponseEntity.ok(result);
+    }
+}
diff --git a/src/main/java/io/streamnative/pulsar/manager/dao/BrokerTokensRepositoryImpl.java b/src/main/java/io/streamnative/pulsar/manager/dao/BrokerTokensRepositoryImpl.java
new file mode 100644
index 0000000..50218bb
--- /dev/null
+++ b/src/main/java/io/streamnative/pulsar/manager/dao/BrokerTokensRepositoryImpl.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.dao;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import io.streamnative.pulsar.manager.entity.BrokerTokenEntity;
+import io.streamnative.pulsar.manager.entity.BrokerTokensRepository;
+import io.streamnative.pulsar.manager.mapper.BrokerTokensMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public class BrokerTokensRepositoryImpl implements BrokerTokensRepository {
+
+    private final BrokerTokensMapper brokerTokensMapper;
+
+    @Autowired
+    public BrokerTokensRepositoryImpl(BrokerTokensMapper brokerTokensMapper) {
+        this.brokerTokensMapper = brokerTokensMapper;
+    }
+
+    public long save(BrokerTokenEntity brokerTokenEntity) {
+        brokerTokensMapper.save(brokerTokenEntity);
+        return brokerTokenEntity.getTokenId();
+    }
+
+    public void update(BrokerTokenEntity brokerTokenEntity) {
+        brokerTokensMapper.update(brokerTokenEntity);
+    }
+
+    public void remove(String role) {
+        brokerTokensMapper.delete(role);
+    }
+
+    public Optional<BrokerTokenEntity> findTokenByRole(String role) {
+        return Optional.ofNullable(brokerTokensMapper.findTokenByRole(role));
+    }
+
+    public Page<BrokerTokenEntity> getBrokerTokensList(Integer pageNum, Integer pageSize) {
+        PageHelper.startPage(pageNum, pageSize);
+        return brokerTokensMapper.findBrokerTokensList();
+    }
+}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/BrokerTokenEntity.java b/src/main/java/io/streamnative/pulsar/manager/entity/BrokerTokenEntity.java
new file mode 100644
index 0000000..680e412
--- /dev/null
+++ b/src/main/java/io/streamnative/pulsar/manager/entity/BrokerTokenEntity.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.entity;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * Token entity for auth from client to broker.
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+public class BrokerTokenEntity {
+    private long tokenId;
+    private String role;
+    private String description;
+    private String token;
+}
diff --git a/src/main/java/io/streamnative/pulsar/manager/entity/BrokerTokensRepository.java b/src/main/java/io/streamnative/pulsar/manager/entity/BrokerTokensRepository.java
new file mode 100644
index 0000000..a2e392b
--- /dev/null
+++ b/src/main/java/io/streamnative/pulsar/manager/entity/BrokerTokensRepository.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.entity;
+
+import com.github.pagehelper.Page;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public interface BrokerTokensRepository {
+
+    long save(BrokerTokenEntity brokerTokenEntity);
+
+    void update(BrokerTokenEntity brokerTokenEntity);
+
+    void remove(String role);
+
+    Optional<BrokerTokenEntity> findTokenByRole(String role);
+
+    Page<BrokerTokenEntity> getBrokerTokensList(Integer pageNum, Integer pageSize);
+}
diff --git a/src/main/java/io/streamnative/pulsar/manager/mapper/BrokerTokensMapper.java b/src/main/java/io/streamnative/pulsar/manager/mapper/BrokerTokensMapper.java
new file mode 100644
index 0000000..cce260e
--- /dev/null
+++ b/src/main/java/io/streamnative/pulsar/manager/mapper/BrokerTokensMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.mapper;
+
+import com.github.pagehelper.Page;
+import io.streamnative.pulsar.manager.entity.BrokerTokenEntity;
+import org.apache.ibatis.annotations.*;
+
+/**
+ * Broker Tokens Mapper
+ */
+@Mapper
+public interface BrokerTokensMapper {
+
+    @Insert("INSERT INTO tokens(role, description, token) VALUES(#{role},#{description},#{token})")
+    @Options(useGeneratedKeys=true, keyProperty="tokenId", keyColumn="token_id")
+    long save(BrokerTokenEntity brokerTokenEntity);
+
+    @Update("UPDATE tokens set token=#{token}, description=#{description} where role=#{role}")
+    void update(BrokerTokenEntity brokerTokenEntity);
+
+    @Select("select token_id as tokenId, role, description, token FROM tokens where role=#{role}")
+    BrokerTokenEntity findTokenByRole(String role);
+
+    @Select("SELECT token_id as tokenId, role, description FROM tokens")
+    Page<BrokerTokenEntity> findBrokerTokensList();
+
+    @Delete("DELETE FROM tokens WHERE role=#{role}")
+    void delete(String role);
+}
diff --git a/src/main/java/io/streamnative/pulsar/manager/service/JwtService.java b/src/main/java/io/streamnative/pulsar/manager/service/JwtService.java
index f46b6b6..da953b4 100644
--- a/src/main/java/io/streamnative/pulsar/manager/service/JwtService.java
+++ b/src/main/java/io/streamnative/pulsar/manager/service/JwtService.java
@@ -13,8 +13,10 @@
  */
 package io.streamnative.pulsar.manager.service;
 
+import io.jsonwebtoken.Claims;
 import org.springframework.stereotype.Service;
 
+import java.security.Key;
 import java.util.Optional;
 
 public interface JwtService {
@@ -23,6 +25,10 @@
 
     Optional<String> getSubFromToken(String token);
 
+    String createBrokerToken(String role, String expiryTime);
+
+    Claims validateBrokerToken(String token);
+
     void setToken(String key, String value);
 
     String getToken(String key);
diff --git a/src/main/java/io/streamnative/pulsar/manager/service/impl/JwtServiceImpl.java b/src/main/java/io/streamnative/pulsar/manager/service/impl/JwtServiceImpl.java
index 49571cf..51cc684 100644
--- a/src/main/java/io/streamnative/pulsar/manager/service/impl/JwtServiceImpl.java
+++ b/src/main/java/io/streamnative/pulsar/manager/service/impl/JwtServiceImpl.java
@@ -13,33 +13,49 @@
  */
 package io.streamnative.pulsar.manager.service.impl;
 
+import io.jsonwebtoken.*;
+import io.jsonwebtoken.security.Keys;
 import io.streamnative.pulsar.manager.service.JwtService;
 
-import io.jsonwebtoken.SignatureAlgorithm;
-import io.jsonwebtoken.Claims;
-import io.jsonwebtoken.Jws;
-import io.jsonwebtoken.Jwts;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
+import java.security.Key;
 import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
+@Slf4j
 @Service
 public class JwtServiceImpl implements JwtService {
 
-    private String secret;
     private int sessionTime;
 
+    @Value("${jwt.broker.token.mode}")
+    private String jwtBrokerTokenMode;
+
+    @Value("${jwt.broker.secret.key}")
+    private String jwtBrokerSecretKey;
+
+    @Value("${jwt.broker.private.key}")
+    private String jwtBrokerPrivateKey;
+
+    @Value("${jwt.broker.public.key}")
+    private String jwtBrokerPublicKey;
+
     private final Map<String, String> tokens = new ConcurrentHashMap<>();
 
+    private final Key key = Keys.secretKeyFor(SignatureAlgorithm.HS256);
+
     @Autowired
-    public JwtServiceImpl(@Value("${jwt.secret}") String secret,
-                             @Value("${jwt.sessionTime}") int sessionTime) {
-        this.secret = secret;
+    public JwtServiceImpl(@Value("${jwt.sessionTime}") int sessionTime) {
         this.sessionTime = sessionTime;
     }
 
@@ -48,14 +64,14 @@
         return Jwts.builder()
                 .setSubject(id)
                 .setExpiration(expireTimeFromNow())
-                .signWith(SignatureAlgorithm.HS512, secret)
+                .signWith(key)
                 .compact();
     }
 
     @Override
     public Optional<String> getSubFromToken(String token) {
         try {
-            Jws<Claims> claimsJws = Jwts.parser().setSigningKey(secret).parseClaimsJws(token);
+            Jws<Claims> claimsJws = Jwts.parser().setSigningKey(key).parseClaimsJws(token);
             return Optional.ofNullable(claimsJws.getBody().getSubject());
         } catch (Exception e) {
             return Optional.empty();
@@ -84,4 +100,67 @@
             this.tokens.remove(key);
         }
     }
+
+    private Key decodeBySecretKey() {
+        try {
+            byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(jwtBrokerSecretKey);
+            Key signingKey = AuthTokenUtils.decodeSecretKey(encodedKey);
+            return signingKey;
+        } catch (IOException e) {
+            log.error("Decode failed by secrete key, error: {}", e.getMessage());
+            return null;
+        }
+    }
+
+    public String createBrokerToken(String role, String expiryTime) {
+        Key signingKey;
+        if (jwtBrokerTokenMode.equals("SECRET")) {
+            signingKey = decodeBySecretKey();
+        } else if (jwtBrokerTokenMode.equals("PRIVATE")){
+            signingKey = decodeByPrivateKey();
+        } else {
+            log.info("Default disable JWT auth, please set jwt.broker.token.mode.");
+            return null;
+        }
+        if (signingKey == null) {
+            log.error("JWT Auth failed, signingKey is not empty");
+            return null;
+        }
+        Optional<Date> optExpiryTime = Optional.empty();
+        if (expiryTime != null) {
+            long relativeTimeMillis = TimeUnit.SECONDS
+                    .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(expiryTime));
+            optExpiryTime = Optional.of(new Date(System.currentTimeMillis() + relativeTimeMillis));
+        }
+        String token = AuthTokenUtils.createToken(signingKey, role, optExpiryTime);
+        return token;
+    }
+
+    private Key decodeByPrivateKey() {
+        try {
+            byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(jwtBrokerPrivateKey);
+            SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
+            Key signingKey = AuthTokenUtils.decodePrivateKey(encodedKey, algorithm);
+            return signingKey;
+        } catch (IOException e) {
+            log.error("Decode failed by private key, error: {}", e.getMessage());
+            return null;
+        }
+    }
+
+    public Claims validateBrokerToken(String token) {
+        Key validationKey;
+        if (jwtBrokerTokenMode.equals("SECRET")) {
+            validationKey = decodeBySecretKey();
+        } else if (jwtBrokerTokenMode.equals("PRIVATE")){
+            validationKey = decodeByPrivateKey();
+        } else {
+            log.info("Default disable JWT auth, please set jwt.broker.token.mode.");
+            return null;
+        }
+        Jwt<?, Claims> jwt = Jwts.parser()
+                .setSigningKey(validationKey)
+                .parse(token);
+        return jwt.getBody();
+    }
 }
diff --git a/src/main/resources/META-INF/sql/herddb-schema.sql b/src/main/resources/META-INF/sql/herddb-schema.sql
index 73f054a..61d7469 100644
--- a/src/main/resources/META-INF/sql/herddb-schema.sql
+++ b/src/main/resources/META-INF/sql/herddb-schema.sql
@@ -106,3 +106,10 @@
   time_stamp BIGINT,
   metadata text
 );
+
+CREATE TABLE IF NOT EXISTS tokens (
+  token_id LONG PRIMARY KEY AUTO_INCREMENT,
+  role varchar(256) NOT NULL,
+  description varchar(128),
+  token varchar(1024)
+);
diff --git a/src/main/resources/META-INF/sql/mysql-schema.sql b/src/main/resources/META-INF/sql/mysql-schema.sql
index 4e6ccd6..ba22fbb 100644
--- a/src/main/resources/META-INF/sql/mysql-schema.sql
+++ b/src/main/resources/META-INF/sql/mysql-schema.sql
@@ -116,3 +116,10 @@
   metadata text
 )ENGINE=InnoDB CHARACTER SET utf8;
 
+CREATE TABLE IF NOT EXISTS tokens (
+  token_id BIGINT PRIMARY KEY AUTO_INCREMENT,
+  role varchar(256) NOT NULL,
+  description varchar(128),
+  token varchar(1024),
+  UNIQUE (role)
+)ENGINE=InnoDB CHARACTER SET utf8;
\ No newline at end of file
diff --git a/src/main/resources/META-INF/sql/postgresql-schema.sql b/src/main/resources/META-INF/sql/postgresql-schema.sql
index 8048bb3..4f4ccf4 100644
--- a/src/main/resources/META-INF/sql/postgresql-schema.sql
+++ b/src/main/resources/META-INF/sql/postgresql-schema.sql
@@ -115,3 +115,11 @@
   time_stamp BIGINT,
   metadata text
 );
+
+CREATE TABLE IF NOT EXISTS tokens (
+  token_id BIGSERIAL PRIMARY KEY,
+  role varchar(256) NOT NULL,
+  description varchar(128),
+  token varchar(1024) NOT NUll,
+  UNIQUE (role)
+);
\ No newline at end of file
diff --git a/src/main/resources/META-INF/sql/sqlite-schema.sql b/src/main/resources/META-INF/sql/sqlite-schema.sql
index 674a032..7e6aa09 100644
--- a/src/main/resources/META-INF/sql/sqlite-schema.sql
+++ b/src/main/resources/META-INF/sql/sqlite-schema.sql
@@ -111,3 +111,13 @@
   time_stamp integer,
   metadata text
 );
+
+CREATE TABLE IF NOT EXISTS tokens (
+  token_id integer PRIMARY KEY AUTOINCREMENT,
+  role varchar(256) NOT NULL,
+  description varchar(128),
+  token varchar(1024) NOT NULL,
+  UNIQUE (role)
+);
+
+
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 08f4199..1c9f766 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -34,7 +34,7 @@
 #spring.datasource.username=
 #spring.datasource.password=
 
-# HerdDB JDBC Driver
+#HerdDB JDBC Driver
 spring.datasource.driver-class-name=herddb.jdbc.Driver
 # HerdDB - local in memory-only
 #spring.datasource.url=jdbc:herddb:local
@@ -79,6 +79,15 @@
 pulsar-manager.account=pulsar
 pulsar-manager.password=pulsar
 
+# Optional -> SECRET, PRIVATE, default -> PRIVATE, empty -> disable auth
+# SECRET mode -> bin/pulsar tokens create --secret-key file:///path/to/my-secret.key --subject test-user
+# PRIVATE mode -> bin/pulsar tokens create --private-key file:///path/to/my-private.key --subject test-user
+# Detail information: http://pulsar.apache.org/docs/en/security-token-admin/
+jwt.broker.token.mode=
+jwt.broker.secret.key=file:///path/broker-secret.key
+jwt.broker.public.key=file:///path/pulsar/broker-public.key
+jwt.broker.private.key=file:///path/broker-private.key
+
 # bookie
 bookie.host=http://localhost:8050
 bookie.enable=false
diff --git a/src/test/java/io/streamnative/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java b/src/test/java/io/streamnative/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
new file mode 100644
index 0000000..1c4317f
--- /dev/null
+++ b/src/test/java/io/streamnative/pulsar/manager/dao/BrokerTokensRepositoryImplTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.dao;
+
+import com.github.pagehelper.Page;
+import io.streamnative.pulsar.manager.PulsarManagerApplication;
+import io.streamnative.pulsar.manager.entity.BrokerTokenEntity;
+import io.streamnative.pulsar.manager.entity.BrokerTokensRepository;
+import io.streamnative.pulsar.manager.profiles.HerdDBTestProfile;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Optional;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+        classes = {
+                PulsarManagerApplication.class,
+                HerdDBTestProfile.class
+        }
+)
+@ActiveProfiles("test")
+public class BrokerTokensRepositoryImplTest {
+
+    @Autowired
+    private BrokerTokensRepository brokerTokensRepository;
+
+    @Test
+    public void brokerTokenTest() {
+        BrokerTokenEntity brokerTokenEntity = new BrokerTokenEntity();
+        brokerTokenEntity.setRole("test");
+        brokerTokenEntity.setDescription("This role for test");
+        brokerTokenEntity.setToken("xxxxxxxxxxxxx");
+        brokerTokensRepository.save(brokerTokenEntity);
+        Page<BrokerTokenEntity> brokerTokenEntityPage = brokerTokensRepository.getBrokerTokensList(1, 1);
+        brokerTokenEntityPage.count(true);
+        brokerTokenEntityPage.getResult().forEach((result) -> {
+            Assert.assertEquals(result.getRole(), brokerTokenEntity.getRole());
+            Assert.assertEquals(result.getDescription(), brokerTokenEntity.getDescription());
+        });
+
+        brokerTokenEntity.setDescription("This role for update test");
+        brokerTokenEntity.setToken("tokentestupdate");
+        brokerTokensRepository.update(brokerTokenEntity);
+        Optional<BrokerTokenEntity> optionalBrokerTokenEntity = brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole());
+        BrokerTokenEntity updatedBrokerTokenEntity = optionalBrokerTokenEntity.get();
+        Assert.assertEquals(updatedBrokerTokenEntity.getRole(), brokerTokenEntity.getRole());
+        Assert.assertEquals(updatedBrokerTokenEntity.getDescription(), brokerTokenEntity.getDescription());
+
+        brokerTokensRepository.remove(brokerTokenEntity.getRole());
+        Assert.assertFalse(brokerTokensRepository.findTokenByRole(brokerTokenEntity.getRole()).isPresent());
+    }
+}
diff --git a/src/test/java/io/streamnative/pulsar/manager/service/BrokerTokensServiceImplTest.java b/src/test/java/io/streamnative/pulsar/manager/service/BrokerTokensServiceImplTest.java
new file mode 100644
index 0000000..bb53f2e
--- /dev/null
+++ b/src/test/java/io/streamnative/pulsar/manager/service/BrokerTokensServiceImplTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.manager.service;
+
+import io.jsonwebtoken.Claims;
+import io.streamnative.pulsar.manager.PulsarManagerApplication;
+import io.streamnative.pulsar.manager.profiles.HerdDBTestProfile;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(SpringRunner.class)
+@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"})
+@TestPropertySource(
+        properties = {
+                "jwt.broker.token.mode=SECRET",
+                "jwt.broker.secret.key=data:base64,cxkc+xPbcF/3E49I1HE4BJKtAwZ/FUER1h7wXk7qkLw="
+        }
+)
+@SpringBootTest(
+        classes = {
+                PulsarManagerApplication.class,
+                HerdDBTestProfile.class
+        }
+)
+@ActiveProfiles("test")
+public class BrokerTokensServiceImplTest {
+
+    @Autowired
+    private JwtService jwtService;
+
+    @Test
+    public void createBrokerTokenTest() {
+        String role = "test";
+        String token = jwtService.createBrokerToken(role, null);
+        Claims jwtBody = jwtService.validateBrokerToken(token);
+        Assert.assertEquals(jwtBody.getSubject(), role);
+    }
+}