Support environment change (#125)

* ### Motivation
Support dynamic change environment

### Modifications
* Add environment page
* Add environment rest api
* Front end support add, remove environment
* Support redirect and forward request
* Add unit test for backend end
diff --git a/front-end/src/api/environments.js b/front-end/src/api/environments.js
new file mode 100644
index 0000000..7f1c7f1
--- /dev/null
+++ b/front-end/src/api/environments.js
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import request from '@/utils/request'
+
+const SPRING_BASE_URL = '/pulsar-manager'
+
+export function fetchEnvironments(query) {
+  return request({
+    url: SPRING_BASE_URL + '/environments',
+    method: 'get',
+    params: { query }
+  })
+}
+
+export function putEnvironment(data) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + '/environments/environment',
+    method: 'put',
+    data
+  })
+}
+
+export function updateEnvironment(data) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + '/environments/environment',
+    method: 'post',
+    data
+  })
+}
+
+export function deleteEnvironment(data) {
+  return request({
+    headers: { 'Content-Type': 'application/json' },
+    url: SPRING_BASE_URL + '/environments/environment',
+    method: 'delete',
+    data
+  })
+}
diff --git a/front-end/src/router/index.js b/front-end/src/router/index.js
index f96db34..78967b0 100644
--- a/front-end/src/router/index.js
+++ b/front-end/src/router/index.js
@@ -70,6 +70,11 @@
     hidden: true
   },
   {
+    path: '/environments',
+    component: () => import('@/views/management/environments/index'),
+    hidden: true
+  },
+  {
     path: '',
     component: Layout,
     redirect: 'management/tenants',
diff --git a/front-end/src/store/modules/user.js b/front-end/src/store/modules/user.js
index 4ebcb1a..f3e3f5e 100644
--- a/front-end/src/store/modules/user.js
+++ b/front-end/src/store/modules/user.js
@@ -13,6 +13,7 @@
  */
 import { loginByUsername, logout, getUserInfo } from '@/api/login'
 import { getToken, setToken, removeToken } from '@/utils/auth'
+import { removeEnvironment } from '@/utils/environment'
 
 const user = {
   state: {
@@ -96,6 +97,7 @@
           commit('SET_TOKEN', '')
           commit('SET_ROLES', [])
           removeToken()
+          removeEnvironment()
           resolve()
         }).catch(error => {
           reject(error)
@@ -108,10 +110,17 @@
       return new Promise(resolve => {
         commit('SET_TOKEN', '')
         removeToken()
+        removeEnvironment()
         resolve()
       })
     },
 
+    SetEnvironment({ commit }, environment) {
+      return new Promise(resolve => {
+        commit('ENVIRONMENT', environment)
+      })
+    },
+
     // 动态修改权限
     ChangeRoles({ commit, dispatch }, role) {
       return new Promise(resolve => {
diff --git a/front-end/src/utils/environment.js b/front-end/src/utils/environment.js
new file mode 100644
index 0000000..906c9d7
--- /dev/null
+++ b/front-end/src/utils/environment.js
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import Cookies from 'js-cookie'
+
+const EnvironmentKey = 'Admin-Environment'
+
+export function getEnvironment() {
+  return Cookies.get(EnvironmentKey)
+}
+
+export function setEnvironment(environment) {
+  return Cookies.set(EnvironmentKey, environment)
+}
+
+export function removeEnvironment() {
+  return Cookies.remove(EnvironmentKey)
+}
diff --git a/front-end/src/utils/request.js b/front-end/src/utils/request.js
index a1f4a0e..efa76a2 100644
--- a/front-end/src/utils/request.js
+++ b/front-end/src/utils/request.js
@@ -16,6 +16,8 @@
 import { Message } from 'element-ui'
 import store from '@/store'
 import { getToken } from '@/utils/auth'
+import { getEnvironment } from '@/utils/environment'
+import router from '../router'
 
 // create an axios instance
 const service = axios.create({
@@ -30,6 +32,7 @@
     if (store.getters.token) {
       config.headers['token'] = getToken()
     }
+    config.headers['environment'] = getEnvironment()
     return config
   },
   error => {
@@ -71,6 +74,11 @@
           location.reload()
         })
       }
+    } else if (error.response.status === 400) {
+      if (error.response.data.hasOwnProperty('message') && error.response.data.message.indexOf('no active environment') > 0) {
+        router.replace({ path: '/environments' })
+        return
+      }
     } else {
       message = error.response.data.reason
     }
diff --git a/front-end/src/views/layout/components/Navbar.vue b/front-end/src/views/layout/components/Navbar.vue
index 3010da4..619f32d 100644
--- a/front-end/src/views/layout/components/Navbar.vue
+++ b/front-end/src/views/layout/components/Navbar.vue
@@ -5,6 +5,17 @@
     <breadcrumb class="breadcrumb-container"/>
 
     <div class="right-menu">
+      <el-dropdown @command="handleCommand">
+        <span class="el-dropdown-link">
+          Environment<i class="el-icon-arrow-down el-icon--right"/>
+        </span>
+        <el-dropdown-menu slot="dropdown">
+          <el-dropdown-item v-for="(item,index) in environmentsListOptions" :command="item" :key="index" :label="item.label" :value="item.value">
+            {{ item.value }}
+          </el-dropdown-item>
+          <el-dropdown-item command="newEnvironment" divided>New Environment</el-dropdown-item>
+        </el-dropdown-menu>
+      </el-dropdown>
       <template v-if="device!=='mobile'">
         <error-log class="errLog-container right-menu-item"/>
 
@@ -17,19 +28,10 @@
       </template>
 
       <el-dropdown class="avatar-container right-menu-item" trigger="click">
-        <!-- <div class="avatar-wrapper">
-          <img src="@/asserts/avatar/avatar.png" class="user-avatar">
-          <i class="el-icon-caret-bottom"/>
-        </div> -->
         <span class="avatar-wrapper">
           Admin<i class="el-icon-arrow-down el-icon--right"/>
         </span>
         <el-dropdown-menu slot="dropdown">
-          <!-- <router-link to="/">
-            <el-dropdown-item>
-              {{ $t('navbar.dashboard') }}
-            </el-dropdown-item>
-          </router-link> -->
           <a target="_blank" href="https://github.com/apache/pulsar">
             <el-dropdown-item>
               {{ $t('navbar.github') }}
@@ -51,6 +53,8 @@
 import ErrorLog from '@/components/ErrorLog'
 import SizeSelect from '@/components/SizeSelect'
 import LangSelect from '@/components/LangSelect'
+import { fetchEnvironments } from '@/api/environments'
+import { setEnvironment } from '@/utils/environment'
 
 export default {
   components: {
@@ -60,6 +64,14 @@
     SizeSelect,
     LangSelect
   },
+  data() {
+    return {
+      environmentsListOptions: [{
+        'label': 'localhost:8080',
+        'value': 'localhost:8080'
+      }]
+    }
+  },
   computed: {
     ...mapGetters([
       'sidebar',
@@ -68,6 +80,9 @@
       'device'
     ])
   },
+  created() {
+    this.getEnvironmentsList()
+  },
   methods: {
     toggleSideBar() {
       this.$store.dispatch('toggleSideBar')
@@ -76,12 +91,36 @@
       this.$store.dispatch('LogOut').then(() => {
         location.reload()// In order to re-instantiate the vue-router object to avoid bugs
       })
+    },
+    handleCommand(command) {
+      if (command === 'newEnvironment') {
+        this.$router.push({ path: '/environments' })
+        return
+      }
+      setEnvironment(command.value)
+      window.location.reload()
+    },
+    getEnvironmentsList() {
+      fetchEnvironments().then(response => {
+        if (!response.data) return
+        this.environmentsListOptions = []
+        for (var i = 0; i < response.data.data.length; i++) {
+          this.environmentsListOptions.push({
+            'value': response.data.data[i].name,
+            'label': response.data.data[i].broker,
+            'status': response.data.data[i].status
+          })
+        }
+      })
     }
   }
 }
 </script>
 
 <style rel="stylesheet/scss" lang="scss" scoped>
+.el-icon-arrow-down {
+  font-size: 12px;
+}
 .navbar {
   height: 50px;
   line-height: 50px;
diff --git a/front-end/src/views/management/environments/index.vue b/front-end/src/views/management/environments/index.vue
new file mode 100644
index 0000000..00895e2
--- /dev/null
+++ b/front-end/src/views/management/environments/index.vue
@@ -0,0 +1,230 @@
+<template>
+  <div class="app-container">
+    <el-button type="primary" icon="el-icon-plus" @click="handleCreateEnvironment">New Environment</el-button>
+
+    <el-row :gutter="24">
+      <el-col :xs="{span: 24}" :sm="{span: 24}" :md="{span: 24}" :lg="{span: 24}" :xl="{span: 24}" style="margin-top:15px">
+        <el-table
+          v-loading="environmentListLoading"
+          :key="environmentTableKey"
+          :data="environmentList"
+          border
+          fit
+          highlight-current-row
+          style="width: 100%;">
+          <el-table-column label="Environments" min-width="50px" align="center">
+            <template slot-scope="scope">
+              <router-link :to="'#'" class="link-type" @click.native="handleSetEnvironment(scope.row.environment)">
+                <span>{{ scope.row.environment }}</span>
+              </router-link>
+            </template>
+          </el-table-column>
+          <el-table-column label="broker" align="center" min-width="100px">
+            <template slot-scope="scope">
+              <span>{{ scope.row.broker }}</span>
+            </template>
+          </el-table-column>
+          <el-table-column :label="$t('table.actions')" align="center" class-name="small-padding fixed-width">
+            <template slot-scope="scope">
+              <el-button type="primary" size="mini" @click="handleUpdateEnvironment(scope.row)">{{ $t('table.edit') }}</el-button>
+              <el-button size="mini" type="danger" @click="handleDeleteEnvironment(scope.row)">{{ $t('table.delete') }}</el-button>
+            </template>
+          </el-table-column>
+        </el-table>
+      </el-col>
+    </el-row>
+
+    <el-dialog :title="textMap[dialogStatus]" :visible.sync="dialogFormVisible" width="30%">
+      <el-form ref="form" :rules="rules" :model="form" label-position="top">
+        <el-form-item v-if="dialogStatus==='create'" label="New Environment" prop="environment">
+          <el-input v-model="form.environment" placeholder="Please input name"/>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='create'" label="New Broker" prop="broker">
+          <el-input v-model="form.broker" placeholder="Please input broker"/>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='update'" label="Environment">
+          <span>{{ form.environment }}</span>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='update'" label="Broker" prop="broker">
+          <el-input v-model="form.broker" placeholder="Please input broker"/>
+        </el-form-item>
+        <el-form-item v-if="dialogStatus==='delete'">
+          <h4>Are you sure you want to delete this environment?</h4>
+        </el-form-item>
+        <el-form-item>
+          <el-button type="primary" @click="handleOptions()">{{ $t('table.confirm') }}</el-button>
+          <el-button @click="dialogFormVisible = false">{{ $t('table.cancel') }}</el-button>
+        </el-form-item>
+      </el-form>
+    </el-dialog>
+  </div>
+</template>
+
+<script>
+import { putEnvironment, fetchEnvironments, deleteEnvironment, updateEnvironment } from '@/api/environments'
+import { setEnvironment } from '@/utils/environment'
+
+export default {
+  name: 'EnvironmentInfo',
+  data() {
+    return {
+      environmentList: [],
+      environmentTableKey: 0,
+      environmentListLoading: false,
+      textMap: {
+        create: 'New Environment',
+        delete: 'Delete Environment',
+        update: 'Update Environemnt'
+      },
+      dialogFormVisible: false,
+      dialogStatus: '',
+      form: {
+        environment: '',
+        broker: ''
+      },
+      temp: {
+        'name': '',
+        'broker': ''
+      },
+      rules: {
+        environment: [{ required: true, message: 'Environment Name is required', trigger: 'blur' }],
+        broker: [{ required: true, message: 'Broker is required', trigger: 'blur' }]
+      }
+    }
+  },
+  created() {
+    this.getEnvironments()
+  },
+  methods: {
+    getEnvironments() {
+      fetchEnvironments().then(response => {
+        if (!response.data) return
+        this.environmentList = []
+        for (var i = 0; i < response.data.data.length; i++) {
+          this.environmentList.push({
+            'environment': response.data.data[i].name,
+            'broker': response.data.data[i].broker
+          })
+        }
+      })
+    },
+    handleCreateEnvironment() {
+      this.form.environment = ''
+      this.form.broker = ''
+      this.dialogFormVisible = true
+      this.dialogStatus = 'create'
+    },
+    handleDeleteEnvironment(row) {
+      this.temp.name = row.environment
+      this.temp.broker = row.broker
+      this.dialogFormVisible = true
+      this.dialogStatus = 'delete'
+    },
+    handleUpdateEnvironment(row) {
+      this.form.environment = row.environment
+      this.form.broker = row.broker
+      this.dialogFormVisible = true
+      this.dialogStatus = 'update'
+    },
+    handleOptions() {
+      this.$refs['form'].validate((valid) => {
+        if (valid) {
+          switch (this.dialogStatus) {
+            case 'create':
+              this.createEnvironment()
+              break
+            case 'delete':
+              this.deleteEnvironment()
+              break
+            case 'update':
+              this.updateEnvironment()
+              break
+          }
+        }
+      })
+    },
+    createEnvironment() {
+      const data = {
+        'name': this.form.environment,
+        'broker': this.form.broker
+      }
+      putEnvironment(data).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.$notify({
+          title: 'success',
+          message: 'Add environment success',
+          type: 'success',
+          duration: 2000
+        })
+        this.dialogFormVisible = false
+        this.getEnvironments()
+      })
+    },
+    deleteEnvironment() {
+      const data = {
+        'name': this.temp.name,
+        'broker': this.temp.broker
+      }
+      deleteEnvironment(data).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.$notify({
+          title: 'success',
+          message: 'Delete environment success',
+          type: 'success',
+          duration: 2000
+        })
+        this.getEnvironments()
+        this.dialogFormVisible = false
+      })
+    },
+    updateEnvironment() {
+      const data = {
+        'name': this.form.environment,
+        'broker': this.form.broker
+      }
+      updateEnvironment(data).then(response => {
+        if (!response.data) return
+        if (response.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: response.data.error,
+            type: 'error',
+            duration: 2000
+          })
+          return
+        }
+        this.$notify({
+          title: 'success',
+          message: 'Update environment success',
+          type: 'success',
+          duration: 2000
+        })
+        this.getEnvironments()
+        this.dialogFormVisible = false
+      })
+    },
+    handleSetEnvironment(environment) {
+      setEnvironment(environment)
+      this.$router.push({ path: '/management/tenants' })
+    }
+  }
+}
+</script>
diff --git a/front-end/src/views/management/tenants/tenant.vue b/front-end/src/views/management/tenants/tenant.vue
index 5abe29f..bbf9ae7 100644
--- a/front-end/src/views/management/tenants/tenant.vue
+++ b/front-end/src/views/management/tenants/tenant.vue
@@ -116,7 +116,7 @@
       </el-tab-pane>
     </el-tabs>
     <el-dialog :title="textMap[dialogStatus]" :visible.sync="dialogFormVisible" width="30%">
-      <el-form :model="temp" label-position="top">
+      <el-form ref="temp" :model="temp" :rules="rules" label-position="top">
         <el-form-item v-if="dialogStatus==='createNamespace'" :label="$t('table.namespace')" prop="namespace">
           <el-input v-model="temp.namespace" placeholder="Please input namespace"/>
         </el-form-item>
diff --git a/src/main/java/com/manager/pulsar/controller/BrokerStatsController.java b/src/main/java/com/manager/pulsar/controller/BrokerStatsController.java
index 2bbd0fb..77bb952 100644
--- a/src/main/java/com/manager/pulsar/controller/BrokerStatsController.java
+++ b/src/main/java/com/manager/pulsar/controller/BrokerStatsController.java
@@ -13,8 +13,9 @@
  */
 package com.manager.pulsar.controller;
 
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.BrokerStatsService;
-import com.manager.pulsar.service.BrokersService;
+import com.manager.pulsar.utils.EnvironmentTools;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -27,8 +28,7 @@
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
-import javax.ws.rs.QueryParam;
-import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
 
 /**
  * Broker Stats route foward
@@ -42,6 +42,12 @@
     @Autowired
     private BrokerStatsService brokerStatsService;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Autowired
+    private HttpServletRequest request;
+
     @ApiOperation(value = "Get the broker stats metrics")
     @ApiResponses({
             @ApiResponse(code = 200, message = "ok"),
@@ -50,7 +56,8 @@
     @RequestMapping(value = "/broker-stats/metrics", method =  RequestMethod.GET)
     public ResponseEntity<String> getBrokerStatsMetrics(
             @RequestParam() String broker) {
-        String result = brokerStatsService.forwarBrokerStatsMetrics(broker);
+        String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+        String result = brokerStatsService.forwarBrokerStatsMetrics(broker, requestHost);
         return ResponseEntity.ok(result);
     }
 
@@ -62,7 +69,8 @@
     @RequestMapping(value = "/broker-stats/topics", method =  RequestMethod.GET)
     public ResponseEntity<String> getBrokerStatsTopics(
             @RequestParam() String broker) {
-        String result = brokerStatsService.forwardBrokerStatsTopics(broker);
+        String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+        String result = brokerStatsService.forwardBrokerStatsTopics(broker, requestHost);
         return ResponseEntity.ok(result);
     }
 
diff --git a/src/main/java/com/manager/pulsar/controller/BrokersController.java b/src/main/java/com/manager/pulsar/controller/BrokersController.java
index 0b600a3..4f562e7 100644
--- a/src/main/java/com/manager/pulsar/controller/BrokersController.java
+++ b/src/main/java/com/manager/pulsar/controller/BrokersController.java
@@ -13,11 +13,10 @@
  */
 package com.manager.pulsar.controller;
 
-import com.github.pagehelper.Page;
-import com.google.common.collect.Maps;
-import com.manager.pulsar.entity.BrokerEntity;
 import com.manager.pulsar.entity.BrokersRepository;
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.BrokersService;
+import com.manager.pulsar.utils.EnvironmentTools;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -33,10 +32,9 @@
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
-import javax.validation.constraints.Size;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * Brokers rest api.
@@ -53,6 +51,12 @@
     @Autowired
     private BrokersRepository brokersRepository;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Autowired
+    private HttpServletRequest request;
+
     @ApiOperation(value = "Get the list of existing brokers, support paging, the default is 10 per page")
     @ApiResponses({
             @ApiResponse(code = 200, message = "ok"),
@@ -69,7 +73,8 @@
             @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
             Integer pageSize,
             @PathVariable String cluster) {
-        Map<String, Object> result = brokersService.getBrokersList(pageNum, pageSize, cluster);
+        String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+        Map<String, Object> result = brokersService.getBrokersList(pageNum, pageSize, cluster, requestHost);
         return ResponseEntity.ok(result);
     }
 //
diff --git a/src/main/java/com/manager/pulsar/controller/ClustersController.java b/src/main/java/com/manager/pulsar/controller/ClustersController.java
index fa53782..3febc43 100644
--- a/src/main/java/com/manager/pulsar/controller/ClustersController.java
+++ b/src/main/java/com/manager/pulsar/controller/ClustersController.java
@@ -15,14 +15,25 @@
 
 import com.manager.pulsar.entity.ClusterEntity;
 import com.manager.pulsar.entity.ClustersRepository;
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.ClustersService;
-import io.swagger.annotations.*;
+import com.manager.pulsar.utils.EnvironmentTools;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import org.hibernate.validator.constraints.Range;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
 import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.Size;
 import java.util.Map;
@@ -43,6 +54,12 @@
     @Autowired
     private ClustersService clusterService;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Autowired
+    private HttpServletRequest request;
+
     @ApiOperation(value = "Get the list of existing clusters, support paging, the default is 10 per page")
     @ApiResponses({
             @ApiResponse(code = 200, message = "ok"),
@@ -58,7 +75,8 @@
             @RequestParam(name="page_size", defaultValue = "10")
             @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
                     Integer pageSize) {
-        Map<String, Object> result = clusterService.getClustersList(pageNum, pageSize);
+        String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+        Map<String, Object> result = clusterService.getClustersList(pageNum, pageSize, requestHost);
 
         return ResponseEntity.ok(result);
     }
diff --git a/src/main/java/com/manager/pulsar/controller/EnvironmentsController.java b/src/main/java/com/manager/pulsar/controller/EnvironmentsController.java
new file mode 100644
index 0000000..7de0d78
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/controller/EnvironmentsController.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.controller;
+
+import com.github.pagehelper.Page;
+import com.google.common.collect.Maps;
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
+import com.manager.pulsar.utils.HttpUtil;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.hibernate.validator.constraints.Range;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.validation.constraints.Min;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Environments for dynamic change connecton broker.
+ */
+@RequestMapping(value = "/pulsar-manager")
+@Api(description = "Support change environments")
+@Validated
+@RestController
+public class EnvironmentsController {
+
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @ApiOperation(value = "Get the list of existing environments, support paging, the default is 10 per page")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/environments", method =  RequestMethod.GET)
+    public ResponseEntity<Map<String, Object>> getEnvironmentsList(
+            @ApiParam(value = "page_num", defaultValue = "1", example = "1")
+            @RequestParam(name = "page_num", defaultValue = "1")
+            @Min(value = 1, message = "page_num is incorrect, should be greater than 0.")
+            Integer pageNum,
+            @ApiParam(value = "page_size", defaultValue = "10", example = "10")
+            @RequestParam(name="page_size", defaultValue = "10")
+            @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
+            Integer pageSize) {
+        Page<EnvironmentEntity> environmentEntityPage = environmentsRepository.getEnvironmentsList(pageNum, pageSize);
+        Map<String, Object> result = Maps.newHashMap();
+        result.put("total", environmentEntityPage.getTotal());
+        result.put("data", environmentEntityPage);
+        return ResponseEntity.ok(result);
+    }
+
+    @ApiOperation(value = "Add environment")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/environments/environment", method =  RequestMethod.PUT)
+    public ResponseEntity<Map<String, Object>> addEnvironment(
+            @RequestBody EnvironmentEntity environmentEntity) {
+        Optional<EnvironmentEntity> environmentEntityBrokerOptional = environmentsRepository
+                .findByBroker(environmentEntity.getBroker());
+        Map<String, Object> result = Maps.newHashMap();
+        if (environmentEntityBrokerOptional.isPresent()) {
+            result.put("error", "Broker is exist");
+            return ResponseEntity.ok(result);
+        }
+        if (environmentEntity.getName() == null) {
+            result.put("error", "Environment name is incorrect");
+            return ResponseEntity.ok(result);
+        }
+        Optional<EnvironmentEntity> environmentEntityNameOptional = environmentsRepository
+                .findByName(environmentEntity.getName());
+        if (environmentEntityNameOptional.isPresent()) {
+            result.put("error", "Environment is exist");
+            return ResponseEntity.ok(result);
+        }
+        Map<String, String> header = Maps.newHashMap();
+        header.put("Content-Type", "application/json");
+        String httpTestResult = HttpUtil.doGet(environmentEntity.getBroker() + "/metrics", header);
+        if (httpTestResult == null) {
+            result.put("error", "This environment is error. Please check it");
+            return ResponseEntity.ok(result);
+        }
+        environmentsRepository.save(environmentEntity);
+        result.put("message", "Add environment success");
+        return ResponseEntity.ok(result);
+    }
+
+    @ApiOperation(value = "Update environment")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/environments/environment", method =  RequestMethod.POST)
+    public ResponseEntity<Map<String, Object>> updateEnvironment(@RequestBody EnvironmentEntity environmentEntity) {
+        Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository
+                .findByName(environmentEntity.getName());
+        Map<String, Object> result = Maps.newHashMap();
+        if (!environmentEntityOptional.isPresent()) {
+            result.put("error", "Environment no exist");
+            return ResponseEntity.ok(result);
+        }
+        Map<String, String> header = Maps.newHashMap();
+        header.put("Content-Type", "application/json");
+        String httpTestResult = HttpUtil.doGet(environmentEntity.getBroker() + "/metrics", header);
+        if (httpTestResult == null) {
+            result.put("error", "This environment is error. Please check it");
+            return ResponseEntity.ok(result);
+        }
+        environmentsRepository.update(environmentEntity);
+        result.put("message", "Update environment success");
+        return ResponseEntity.ok(result);
+    }
+
+    @ApiOperation(value = "Delete environment")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(value = "/environments/environment", method =  RequestMethod.DELETE)
+    public ResponseEntity<Map<String, Object>> deleteEnvironment(@RequestBody EnvironmentEntity environmentEntity) {
+        Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository
+                .findByName(environmentEntity.getName());
+        Map<String, Object> result = Maps.newHashMap();
+        if (!environmentEntityOptional.isPresent()) {
+            result.put("error", "Environment no exist");
+            return ResponseEntity.ok(result);
+        }
+        environmentsRepository.remove(environmentEntity.getName());
+        result.put("message", "Delete environment success");
+        return ResponseEntity.ok(result);
+    }
+}
diff --git a/src/main/java/com/manager/pulsar/controller/NamespacesController.java b/src/main/java/com/manager/pulsar/controller/NamespacesController.java
index 7dd3dda..5b7d36c 100644
--- a/src/main/java/com/manager/pulsar/controller/NamespacesController.java
+++ b/src/main/java/com/manager/pulsar/controller/NamespacesController.java
@@ -15,10 +15,16 @@
 
 import com.github.pagehelper.Page;
 import com.google.common.collect.Maps;
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.entity.NamespaceEntity;
 import com.manager.pulsar.entity.NamespacesRepository;
 import com.manager.pulsar.service.NamespacesService;
-import io.swagger.annotations.*;
+import com.manager.pulsar.utils.EnvironmentTools;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import org.hibernate.validator.constraints.Range;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
@@ -29,6 +35,7 @@
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.Size;
 import java.util.Map;
@@ -49,6 +56,12 @@
     @Autowired
     private NamespacesService namespacesService;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Autowired
+    private HttpServletRequest request;
+
     @ApiOperation(value = "Get the list of existing namespaces, support paging, the default is 10 per page")
     @ApiResponses({
             @ApiResponse(code = 200, message = "ok"),
@@ -89,7 +102,8 @@
             @RequestParam(name="page_size", defaultValue = "10")
             @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
             Integer pageSize) {
-        Map<String, Object> result = namespacesService.getNamespaceList(pageNum, pageSize, tenantOrNamespace);
+        String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+        Map<String, Object> result = namespacesService.getNamespaceList(pageNum, pageSize, tenantOrNamespace, requestHost);
         return ResponseEntity.ok(result);
     }
 
diff --git a/src/main/java/com/manager/pulsar/controller/TenantsController.java b/src/main/java/com/manager/pulsar/controller/TenantsController.java
index 85aedd1..e345a72 100644
--- a/src/main/java/com/manager/pulsar/controller/TenantsController.java
+++ b/src/main/java/com/manager/pulsar/controller/TenantsController.java
@@ -13,7 +13,9 @@
  */
 package com.manager.pulsar.controller;
 
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.TenantsService;
+import com.manager.pulsar.utils.EnvironmentTools;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -28,6 +30,7 @@
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import java.util.Map;
 
@@ -43,6 +46,12 @@
   @Autowired
   private TenantsService tenantsService;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Autowired
+    private HttpServletRequest request;
+
   @ApiOperation(value = "Get the list of existing tenants, support paging, the default is 10 per page")
   @ApiResponses({
           @ApiResponse(code = 200, message = "ok"),
@@ -59,7 +68,8 @@
           @RequestParam(name="page_size", defaultValue = "10")
           @Range(min = 1, max = 1000, message = "page_size is incorrect, should be greater than 0 and less than 1000.")
           Integer pageSize) {
-    Map<String, Object> result = tenantsService.getTenantsList(pageNum, pageSize);
+      String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+      Map<String, Object> result = tenantsService.getTenantsList(pageNum, pageSize, requestHost);
     return ResponseEntity.ok(result);
   }
 }
diff --git a/src/main/java/com/manager/pulsar/controller/TopicsController.java b/src/main/java/com/manager/pulsar/controller/TopicsController.java
index d56b9ac..f59fcb3 100644
--- a/src/main/java/com/manager/pulsar/controller/TopicsController.java
+++ b/src/main/java/com/manager/pulsar/controller/TopicsController.java
@@ -13,7 +13,9 @@
  */
 package com.manager.pulsar.controller;
 
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.TopicsService;
+import com.manager.pulsar.utils.EnvironmentTools;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -28,6 +30,7 @@
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.Size;
 import java.util.Map;
@@ -44,6 +47,12 @@
     @Autowired
     private TopicsService topicsService;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Autowired
+    private HttpServletRequest request;
+
     @ApiOperation(value = "Query topic info by tenant and namespace and topic")
     @ApiResponses({
             @ApiResponse(code = 200, message = "ok"),
@@ -65,7 +74,8 @@
             @ApiParam(value = "The name of namespace")
             @Size(min = 1, max = 255)
             @PathVariable String namespace) {
-        Map<String, Object> result = topicsService.getTopicsList(pageNum, pageSize, tenant, namespace);
+        String requestHost = EnvironmentTools.getEnvironment(request, environmentsRepository);
+        Map<String, Object> result = topicsService.getTopicsList(pageNum, pageSize, tenant, namespace, requestHost);
         return result;
     }
 }
\ No newline at end of file
diff --git a/src/main/java/com/manager/pulsar/dao/EnvironmentsRepositoryImpl.java b/src/main/java/com/manager/pulsar/dao/EnvironmentsRepositoryImpl.java
new file mode 100644
index 0000000..0076eff
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/dao/EnvironmentsRepositoryImpl.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.dao;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
+import com.manager.pulsar.mapper.EnvironmentsMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+/**
+ * EnvironmentsRepositoryImpl implements EnvironmentsRepository for change environment.
+ */
+@Repository
+public class EnvironmentsRepositoryImpl implements EnvironmentsRepository {
+
+    private final EnvironmentsMapper environmentsMapper;
+
+    @Autowired
+    public EnvironmentsRepositoryImpl(EnvironmentsMapper environmentsMapper) {
+        this.environmentsMapper = environmentsMapper;
+    }
+
+    @Override
+    public Optional<EnvironmentEntity> findByBroker(String broker) {
+        return Optional.ofNullable(environmentsMapper.findByBroker(broker));
+    }
+
+    @Override
+    public Optional<EnvironmentEntity> findByName(String name) {
+        return Optional.ofNullable((environmentsMapper.findByName(name)));
+    }
+
+    @Override
+    public void save(EnvironmentEntity environmentEntity) {
+        environmentsMapper.insert(environmentEntity);
+    }
+
+    @Override
+    public Page<EnvironmentEntity> getEnvironmentsList(Integer pageNum, Integer pageSize) {
+        PageHelper.startPage(pageNum, pageSize);
+        return environmentsMapper.findEnvironmentsList();
+    }
+
+    @Override
+    public void remove(String name) {
+        environmentsMapper.delete(name);
+    }
+
+    @Override
+    public void update(EnvironmentEntity environmentEntity) {
+        environmentsMapper.update(environmentEntity);
+    }
+}
diff --git a/src/main/java/com/manager/pulsar/entity/EnvironmentEntity.java b/src/main/java/com/manager/pulsar/entity/EnvironmentEntity.java
new file mode 100644
index 0000000..5bb886d
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/entity/EnvironmentEntity.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.entity;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * Environment entity.
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+public class EnvironmentEntity {
+    private String name;
+    private String broker;
+}
diff --git a/src/main/java/com/manager/pulsar/entity/EnvironmentsRepository.java b/src/main/java/com/manager/pulsar/entity/EnvironmentsRepository.java
new file mode 100644
index 0000000..8fa03f7
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/entity/EnvironmentsRepository.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.entity;
+
+import com.github.pagehelper.Page;
+import org.springframework.stereotype.Repository;
+
+import java.util.Optional;
+
+@Repository
+public interface EnvironmentsRepository {
+
+    void save(EnvironmentEntity environmentEntity);
+
+    Optional<EnvironmentEntity> findByBroker(String broker);
+
+    Optional<EnvironmentEntity> findByName(String name);
+
+    Page<EnvironmentEntity> getEnvironmentsList(Integer pageNum, Integer pageSize);
+
+    void remove(String name);
+
+    void update(EnvironmentEntity environmentEntity);
+
+}
diff --git a/src/main/java/com/manager/pulsar/interceptor/AdminHandlerInterceptor.java b/src/main/java/com/manager/pulsar/interceptor/AdminHandlerInterceptor.java
index 44241f3..ca972cb 100644
--- a/src/main/java/com/manager/pulsar/interceptor/AdminHandlerInterceptor.java
+++ b/src/main/java/com/manager/pulsar/interceptor/AdminHandlerInterceptor.java
@@ -15,6 +15,8 @@
 
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.JwtService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.lang.Nullable;
@@ -25,6 +27,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.util.Map;
+import java.util.Optional;
 
 @Component
 public class AdminHandlerInterceptor extends HandlerInterceptorAdapter {
@@ -32,6 +35,9 @@
     @Autowired
     private JwtService jwtService;
 
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
     @Override
     public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
         String token = request.getHeader("token");
@@ -44,6 +50,16 @@
             response.getWriter().append(gson.toJson(map));
             return false;
         }
+        String environment = request.getHeader("environment");
+        Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(environment);
+        if (!request.getRequestURI().startsWith("/pulsar-manager/environments") && !environmentEntityOptional.isPresent()) {
+            Map<String, Object> map = Maps.newHashMap();
+            map.put("message", "Currently there is no active environment, please set one");
+            Gson gson = new Gson();
+            response.setStatus(400);
+            response.getWriter().append(gson.toJson(map));
+            return false;
+        }
         return true;
     }
 
diff --git a/src/main/java/com/manager/pulsar/mapper/EnvironmentsMapper.java b/src/main/java/com/manager/pulsar/mapper/EnvironmentsMapper.java
new file mode 100644
index 0000000..23c9f1d
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/mapper/EnvironmentsMapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.mapper;
+
+import com.github.pagehelper.Page;
+import com.manager.pulsar.entity.EnvironmentEntity;
+import org.apache.ibatis.annotations.*;
+
+import java.util.Optional;
+
+@Mapper
+public interface EnvironmentsMapper {
+
+    @Insert("INSERT INTO environments(name,broker) VALUES(#{name},#{broker})")
+    void insert(EnvironmentEntity environmentEntity);
+
+    @Select("SELECT name,broker FROM environments where broker=#{broker}")
+    EnvironmentEntity findByBroker(String broker);
+
+    @Select("SELECT name,broker FROM environments where name=#{name}")
+    EnvironmentEntity findByName(String name);
+
+    @Select("SELECT name,broker FROM environments")
+    Page<EnvironmentEntity> findEnvironmentsList();
+
+    @Update("UPDATE environments set broker=#{broker} where name=#{name}")
+    void update(EnvironmentEntity environmentEntity);
+
+    @Delete("DELETE FROM environments WHERE name=#{name}")
+    void delete(String name);
+
+    @Delete("DELETE FROM environments WHERE broker=#{broker}")
+    void deleteByBroker(String broker);
+}
diff --git a/src/main/java/com/manager/pulsar/service/BrokerStatsService.java b/src/main/java/com/manager/pulsar/service/BrokerStatsService.java
index 445d1a4..da23be6 100644
--- a/src/main/java/com/manager/pulsar/service/BrokerStatsService.java
+++ b/src/main/java/com/manager/pulsar/service/BrokerStatsService.java
@@ -15,7 +15,7 @@
 
 public interface BrokerStatsService {
 
-    String forwarBrokerStatsMetrics(String broker);
+    String forwarBrokerStatsMetrics(String broker, String requestHost);
 
-    String forwardBrokerStatsTopics(String broker);
+    String forwardBrokerStatsTopics(String broker, String requestHost);
 }
diff --git a/src/main/java/com/manager/pulsar/service/BrokersService.java b/src/main/java/com/manager/pulsar/service/BrokersService.java
index 397f1ef..0219146 100644
--- a/src/main/java/com/manager/pulsar/service/BrokersService.java
+++ b/src/main/java/com/manager/pulsar/service/BrokersService.java
@@ -17,5 +17,5 @@
 
 public interface BrokersService {
 
-    Map<String, Object> getBrokersList(Integer pageNum, Integer pageSize, String cluster);
+    Map<String, Object> getBrokersList(Integer pageNum, Integer pageSize, String cluster, String requestHost);
 }
diff --git a/src/main/java/com/manager/pulsar/service/ClustersService.java b/src/main/java/com/manager/pulsar/service/ClustersService.java
index 35f5565..9d78aa3 100644
--- a/src/main/java/com/manager/pulsar/service/ClustersService.java
+++ b/src/main/java/com/manager/pulsar/service/ClustersService.java
@@ -16,5 +16,5 @@
 import java.util.Map;
 
 public interface ClustersService {
-    Map<String, Object> getClustersList(Integer pageNum, Integer pageSize);
+    Map<String, Object> getClustersList(Integer pageNum, Integer pageSize, String requestHost);
 }
diff --git a/src/main/java/com/manager/pulsar/service/NamespacesService.java b/src/main/java/com/manager/pulsar/service/NamespacesService.java
index 786de06..011959b 100644
--- a/src/main/java/com/manager/pulsar/service/NamespacesService.java
+++ b/src/main/java/com/manager/pulsar/service/NamespacesService.java
@@ -17,6 +17,7 @@
 
 public interface NamespacesService {
 
-    Map<String, Object> getNamespaceList(Integer pageNum, Integer pageSize, String tenant);
+    Map<String, Object> getNamespaceList(
+            Integer pageNum, Integer pageSize, String tenant, String requestHost);
 
 }
\ No newline at end of file
diff --git a/src/main/java/com/manager/pulsar/service/TenantsService.java b/src/main/java/com/manager/pulsar/service/TenantsService.java
index 2aafc7a..6831ba7 100644
--- a/src/main/java/com/manager/pulsar/service/TenantsService.java
+++ b/src/main/java/com/manager/pulsar/service/TenantsService.java
@@ -20,5 +20,6 @@
 @Service
 public interface TenantsService {
 
-    Map<String, Object> getTenantsList(Integer pageNum, Integer pageSize);
+    Map<String, Object> getTenantsList(
+            Integer pageNum, Integer pageSize, String requestHost);
 }
diff --git a/src/main/java/com/manager/pulsar/service/TopicsService.java b/src/main/java/com/manager/pulsar/service/TopicsService.java
index 7949ca6..e9386ff 100644
--- a/src/main/java/com/manager/pulsar/service/TopicsService.java
+++ b/src/main/java/com/manager/pulsar/service/TopicsService.java
@@ -17,5 +17,6 @@
 
 public interface TopicsService {
 
-    Map<String, Object> getTopicsList(Integer pageNum, Integer pageSize, String namespace, String tenant);
+    Map<String, Object> getTopicsList(
+            Integer pageNum, Integer pageSize, String namespace, String tenant, String requestHost);
 }
\ No newline at end of file
diff --git a/src/main/java/com/manager/pulsar/service/impl/BrokerStatsServiceImpl.java b/src/main/java/com/manager/pulsar/service/impl/BrokerStatsServiceImpl.java
index 856a8fd..59923e0 100644
--- a/src/main/java/com/manager/pulsar/service/impl/BrokerStatsServiceImpl.java
+++ b/src/main/java/com/manager/pulsar/service/impl/BrokerStatsServiceImpl.java
@@ -31,21 +31,21 @@
         put("Content-Type","application/json");
     }};
 
-    public String forwarBrokerStatsMetrics(String broker) {
+    public String forwarBrokerStatsMetrics(String broker, String requestHost) {
 
-        broker = checkBroker(broker);
+        broker = checkBroker(broker, requestHost);
         return HttpUtil.doGet(broker + "/admin/v2/broker-stats/metrics", header);
     }
 
-    public String forwardBrokerStatsTopics(String broker) {
+    public String forwardBrokerStatsTopics(String broker, String requestHost) {
 
-        broker = checkBroker(broker);
+        broker = checkBroker(broker, requestHost);
         return HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
     }
 
-    private String checkBroker(String broker) {
+    private String checkBroker(String broker, String requestHost) {
         if (broker == null || broker.length() <= 0) {
-            broker = directRequestHost;
+            broker = requestHost;
         }
 
         if (!broker.startsWith("http")) {
diff --git a/src/main/java/com/manager/pulsar/service/impl/BrokersServiceImpl.java b/src/main/java/com/manager/pulsar/service/impl/BrokersServiceImpl.java
index 2c96265..d52b9c5 100644
--- a/src/main/java/com/manager/pulsar/service/impl/BrokersServiceImpl.java
+++ b/src/main/java/com/manager/pulsar/service/impl/BrokersServiceImpl.java
@@ -31,10 +31,8 @@
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.directRequestHost}")
-    private String directRequestHost;
 
-    public Map<String, Object> getBrokersList(Integer pageNum, Integer pageSize, String cluster) {
+    public Map<String, Object> getBrokersList(Integer pageNum, Integer pageSize, String cluster, String requestHost) {
         Map<String, Object> brokersMap = Maps.newHashMap();
         List<Map<String, Object>> brokersArray = new ArrayList<>();
         if (directRequestBroker) {
@@ -42,10 +40,10 @@
             Map<String, String> header = Maps.newHashMap();
             header.put("Content-Type", "application/json");
             String failureDomainsResult = HttpUtil.doGet(
-                    directRequestHost + "/admin/v2/clusters/" + cluster + "/failureDomains", header);
+                    requestHost + "/admin/v2/clusters/" + cluster + "/failureDomains", header);
             Map<String, Map<String, List<String>>> failureDomains = gson.fromJson(
                     failureDomainsResult, new TypeToken<Map<String, Map<String, List<String>>>>() {}.getType());
-            String result = HttpUtil.doGet(directRequestHost + "/admin/v2/brokers/" + cluster, header);
+            String result = HttpUtil.doGet(requestHost + "/admin/v2/brokers/" + cluster, header);
             List<String> brokersList = gson.fromJson(result, new TypeToken<List<String>>() {}.getType());
             for (String broker: brokersList) {
                 Map<String, Object> brokerEntity = Maps.newHashMap();
diff --git a/src/main/java/com/manager/pulsar/service/impl/ClustersServiceImpl.java b/src/main/java/com/manager/pulsar/service/impl/ClustersServiceImpl.java
index cdf7d83..c6fdae6 100644
--- a/src/main/java/com/manager/pulsar/service/impl/ClustersServiceImpl.java
+++ b/src/main/java/com/manager/pulsar/service/impl/ClustersServiceImpl.java
@@ -16,6 +16,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
+import com.manager.pulsar.entity.EnvironmentsRepository;
 import com.manager.pulsar.service.BrokersService;
 import com.manager.pulsar.service.ClustersService;
 import com.manager.pulsar.utils.HttpUtil;
@@ -24,6 +25,7 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import javax.servlet.http.HttpServletRequest;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -34,27 +36,25 @@
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.directRequestHost}")
-    private String directRequestHost;
-
     @Autowired
     private BrokersService brokersService;
 
-    public Map<String, Object> getClustersList(Integer pageNum, Integer pageSize) {
+
+    public Map<String, Object> getClustersList(Integer pageNum, Integer pageSize, String requestHost) {
         Map<String, Object> clustersMap = Maps.newHashMap();
         List<Map<String, Object>> clustersArray = new ArrayList<>();
         if (directRequestBroker) {
             Gson gson = new Gson();
             Map<String, String> header = Maps.newHashMap();
             header.put("Content-Type", "application/json");
-            String result = HttpUtil.doGet(directRequestHost + "/admin/v2/clusters", header);
+            String result = HttpUtil.doGet(requestHost + "/admin/v2/clusters", header);
             List<String> clustersList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
             for (String cluster: clustersList) {
                 Map<String, Object> clusterEntity = Maps.newHashMap();
-                Map<String, Object> brokers = brokersService.getBrokersList(1, 1, cluster);
+                Map<String, Object> brokers = brokersService.getBrokersList(1, 1, cluster, requestHost);
                 clusterEntity.put("brokers", brokers.get("total"));
                 clusterEntity.put("cluster", cluster);
-                String clusterInfo = HttpUtil.doGet(directRequestHost + "/admin/v2/clusters/" + cluster, header);
+                String clusterInfo = HttpUtil.doGet(requestHost + "/admin/v2/clusters/" + cluster, header);
                 ClusterData clusterData = gson.fromJson(clusterInfo, ClusterData.class);
                 clusterEntity.put("serviceUrl", clusterData.getServiceUrl());
                 clusterEntity.put("serviceUrlTls", clusterData.getServiceUrlTls());
diff --git a/src/main/java/com/manager/pulsar/service/impl/NamespacesServiceImpl.java b/src/main/java/com/manager/pulsar/service/impl/NamespacesServiceImpl.java
index be74939..a100bcb 100644
--- a/src/main/java/com/manager/pulsar/service/impl/NamespacesServiceImpl.java
+++ b/src/main/java/com/manager/pulsar/service/impl/NamespacesServiceImpl.java
@@ -33,27 +33,24 @@
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.directRequestHost}")
-    private String directRequestHost;
-
     @Autowired
     private TopicsService topicsService;
 
-    public Map<String, Object> getNamespaceList(Integer pageNum, Integer pageSize, String tenant) {
+    public Map<String, Object> getNamespaceList(Integer pageNum, Integer pageSize, String tenant, String requestHost) {
         Map<String, Object> namespacesMap = Maps.newHashMap();
         List<Map<String, Object>> namespacesArray = new ArrayList<>();
         if (directRequestBroker) {
             Gson gson = new Gson();
             Map<String, String> header = Maps.newHashMap();
             header.put("Content-Type", "application/json");
-            String result = HttpUtil.doGet(directRequestHost + "/admin/v2/namespaces/" + tenant, header);
+            String result = HttpUtil.doGet(requestHost + "/admin/v2/namespaces/" + tenant, header);
             if (result != null) {
                 List<String> namespacesList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
                 for (String tenantNamespace : namespacesList) {
                     String namespace = tenantNamespace.split("/")[1];
                     Map<String, Object> topicsEntity = Maps.newHashMap();
                     Map<String, Object> topics = topicsService.getTopicsList(
-                            0, 0, tenant, namespace);
+                            0, 0, tenant, namespace, requestHost);
                     topicsEntity.put("topics", topics.get("total"));
                     topicsEntity.put("namespace", namespace);
                     namespacesArray.add(topicsEntity);
@@ -67,4 +64,5 @@
         }
         return namespacesMap;
     }
+
 }
\ No newline at end of file
diff --git a/src/main/java/com/manager/pulsar/service/impl/TenantsServiceImpl.java b/src/main/java/com/manager/pulsar/service/impl/TenantsServiceImpl.java
index 914f2b7..950e1fe 100644
--- a/src/main/java/com/manager/pulsar/service/impl/TenantsServiceImpl.java
+++ b/src/main/java/com/manager/pulsar/service/impl/TenantsServiceImpl.java
@@ -16,14 +16,11 @@
 import com.google.common.collect.Maps;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
-import com.manager.pulsar.entity.NamespacesRepository;
-import com.manager.pulsar.entity.TenantsRepository;
 import com.manager.pulsar.service.TenantsService;
 import com.manager.pulsar.utils.HttpUtil;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
@@ -31,7 +28,6 @@
 import java.util.List;
 import java.util.Map;
 
-
 @Service
 public class TenantsServiceImpl implements TenantsService {
 
@@ -41,33 +37,24 @@
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.directRequestHost}")
-    private String directRequestHost;
-
-    @Autowired
-    private TenantsRepository tenantsRepository;
-
-    @Autowired
-    private NamespacesRepository namespacesRepository;
-
-    public Map<String, Object> getTenantsList(Integer pageNum, Integer pageSize) {
+    public Map<String, Object> getTenantsList(Integer pageNum, Integer pageSize, String requestHost) {
         Map<String, Object> tenantsMap = Maps.newHashMap();
         List<Map<String, Object>> tenantsArray = new ArrayList<>();
         if (directRequestBroker) {
             Gson gson = new Gson();
             Map<String, String> header = Maps.newHashMap();
             header.put("Content-Type", "application/json");
-            String result = HttpUtil.doGet( directRequestHost + "/admin/v2/tenants", header);
+            String result = HttpUtil.doGet( requestHost + "/admin/v2/tenants", header);
             if (result != null) {
                 List<String> tenantsList = gson.fromJson(result, new TypeToken<List<String>>(){}.getType());
                 for (String tenant : tenantsList) {
                     Map<String, Object> tenantEntity = Maps.newHashMap();
-                    String info = HttpUtil.doGet( directRequestHost + "/admin/v2/tenants/" + tenant, header);
+                    String info = HttpUtil.doGet( requestHost + "/admin/v2/tenants/" + tenant, header);
                     TenantInfo tenantInfo = gson.fromJson(info, TenantInfo.class);
                     tenantEntity.put("tenant", tenant);
                     tenantEntity.put("adminRoles", String.join(",", tenantInfo.getAdminRoles()));
                     tenantEntity.put("allowedClusters", String.join(",",  tenantInfo.getAllowedClusters()));
-                    String namespace = HttpUtil.doGet(directRequestHost + "/admin/v2/namespaces/" + tenant, header);
+                    String namespace = HttpUtil.doGet(requestHost + "/admin/v2/namespaces/" + tenant, header);
                     if (namespace != null) {
                         List<String> namespacesList = gson.fromJson(namespace, new TypeToken<List<String>>(){}.getType());
                         tenantEntity.put("namespaces", namespacesList.size());
@@ -87,4 +74,5 @@
         }
         return tenantsMap;
     }
+
 }
diff --git a/src/main/java/com/manager/pulsar/service/impl/TopicsServiceImpl.java b/src/main/java/com/manager/pulsar/service/impl/TopicsServiceImpl.java
index 78f3f1e..1fdb4c6 100644
--- a/src/main/java/com/manager/pulsar/service/impl/TopicsServiceImpl.java
+++ b/src/main/java/com/manager/pulsar/service/impl/TopicsServiceImpl.java
@@ -32,11 +32,9 @@
     @Value("${backend.directRequestBroker}")
     private boolean directRequestBroker;
 
-    @Value("${backend.directRequestHost}")
-    private String directRequestHost;
-
     public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
 
+
     private boolean isPartitonedTopic(List<String> topics, String topic) {
         if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) {
             String[] t = topic.split(PARTITIONED_TOPIC_SUFFIX);
@@ -47,10 +45,13 @@
         return false;
     }
 
-    public Map<String, Object> getTopicsList(Integer pageNum, Integer pageSize, String tenant, String namespace) {
+    public Map<String, Object> getTopicsList(
+            Integer pageNum, Integer pageSize, String tenant, String namespace, String requestHost) {
         Map<String, Object> topicsMap = Maps.newHashMap();
-        List<Map<String, String>> persistentTopic = this.getTopicListByHttp(tenant, namespace, "persistent");
-        List<Map<String, String>> nonPersistentTopic = this.getTopicListByHttp(tenant, namespace, "non-persistent");
+        List<Map<String, String>> persistentTopic = this.getTopicListByHttp(
+                tenant, namespace, "persistent", requestHost);
+        List<Map<String, String>> nonPersistentTopic = this.getTopicListByHttp(
+                tenant, namespace, "non-persistent", requestHost);
         persistentTopic.addAll(nonPersistentTopic);
         topicsMap.put("topics", persistentTopic);
         topicsMap.put("isPage", false);
@@ -60,13 +61,14 @@
         return topicsMap;
     }
 
-    private List<Map<String, String>> getTopicListByHttp(String tenant, String namespace, String persistent) {
+    private List<Map<String, String>> getTopicListByHttp(
+            String tenant, String namespace, String persistent, String requestHost) {
         List<Map<String, String>> topicsArray = new ArrayList<>();
         Map<String, String> header = Maps.newHashMap();
         header.put("Content-Type", "application/json");
         String prefix = "/admin/v2/" + persistent + "/" + tenant + "/" + namespace;
         Gson gson = new Gson();
-        String partitonedUrl = directRequestHost + prefix + "/partitioned";
+        String partitonedUrl = requestHost + prefix + "/partitioned";
         String partitonedTopic = HttpUtil.doGet(partitonedUrl, header);
         List<String> partitionedTopicsList = Arrays.asList();
         Map<String, List<String>> partitionedMap = Maps.newHashMap();
@@ -80,7 +82,7 @@
             }
         }
 
-        String topicUrl = directRequestHost + prefix;
+        String topicUrl = requestHost + prefix;
         String topics = HttpUtil.doGet(topicUrl, header);
         if (topics != null) {
             List<String> topicsList = gson.fromJson(
@@ -112,7 +114,7 @@
                     topicEntity.put("persistent", persistent);
                 } else {
                     topicEntity.put("topic", topicName);
-                    String metadataTopicUrl = directRequestHost + prefix + "/" + topicName + "/partitions";
+                    String metadataTopicUrl = requestHost + prefix + "/" + topicName + "/partitions";
                     String metadataTopic = HttpUtil.doGet(metadataTopicUrl, header);
                     Map<String, Integer> metadata = gson.fromJson(
                             metadataTopic, new TypeToken<Map<String, Integer>>(){}.getType());
diff --git a/src/main/java/com/manager/pulsar/utils/EnvironmentTools.java b/src/main/java/com/manager/pulsar/utils/EnvironmentTools.java
new file mode 100644
index 0000000..d5c904c
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/utils/EnvironmentTools.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.utils;
+
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.Optional;
+
+public class EnvironmentTools {
+
+    public static String getEnvironment(HttpServletRequest request, EnvironmentsRepository environmentsRepository) {
+        String environment = request.getHeader("environment");
+        Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(environment);
+        EnvironmentEntity environmentEntity = environmentEntityOptional.get();
+        String directRequestHost = environmentEntity.getBroker();
+        return directRequestHost;
+    }
+}
diff --git a/src/main/java/com/manager/pulsar/zuul/EnvirmentForward.java b/src/main/java/com/manager/pulsar/zuul/EnvirmentForward.java
new file mode 100644
index 0000000..80fedda
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/zuul/EnvirmentForward.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.zuul;
+
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
+import com.netflix.zuul.ZuulFilter;
+import com.netflix.zuul.context.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
+import org.springframework.stereotype.Component;
+
+import javax.servlet.http.HttpServletRequest;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.PRE_TYPE;
+import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.REQUEST_URI_KEY;
+
+/**
+ * Handle http redirect and forward.
+ */
+@Component
+public class EnvirmentForward extends ZuulFilter {
+
+    private static final Logger log = LoggerFactory.getLogger(EnvirmentForward.class);
+
+
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Override
+    public String filterType() {
+        return PRE_TYPE;
+    }
+
+    @Override
+    public int filterOrder() {
+        return FilterConstants.SEND_FORWARD_FILTER_ORDER;
+    }
+
+    @Override
+    public boolean shouldFilter() {
+        return true;
+    }
+
+    @Override
+    public Object run() {
+
+        RequestContext ctx = RequestContext.getCurrentContext();
+        HttpServletRequest request = ctx.getRequest();
+        String redirect = request.getParameter("redirect");
+
+        if (redirect != null && redirect.equals("true")) {
+            String redirectScheme = request.getParameter("redirect.scheme");
+            String redirectHost = request.getParameter("redirect.host");
+            String redirectPort = request.getParameter("redirect.port");
+            String url = redirectScheme + "://" + redirectHost + ":" + redirectPort;
+            ctx.put(REQUEST_URI_KEY, request.getRequestURI());
+            try {
+                ctx.setRouteHost(new URL(url));
+            } catch(MalformedURLException mue) {
+                log.error("Route redirect to {} path {} error: {}", url, request.getRequestURI(), mue.getMessage());
+            }
+            return null;
+        }
+        String environment = request.getHeader("environment");
+        Optional<EnvironmentEntity> entityOptional = environmentsRepository.findByName(environment);
+        if (entityOptional.isPresent()) {
+            EnvironmentEntity environmentEntity = entityOptional.get();
+            String broker = environmentEntity.getBroker();
+            ctx.put(REQUEST_URI_KEY, request.getRequestURI());
+            try {
+                ctx.setRouteHost(new URL(broker));
+            } catch(MalformedURLException mue) {
+                log.error("Route forward to {} path {} error: {}", broker, request.getRequestURI(), mue.getMessage());
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/src/main/java/com/manager/pulsar/zuul/LocationHeaderRewritingFilter.java b/src/main/java/com/manager/pulsar/zuul/LocationHeaderRewritingFilter.java
new file mode 100644
index 0000000..ac5ecf0
--- /dev/null
+++ b/src/main/java/com/manager/pulsar/zuul/LocationHeaderRewritingFilter.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.zuul;
+
+import com.netflix.util.Pair;
+import com.netflix.zuul.ZuulFilter;
+import com.netflix.zuul.context.RequestContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.netflix.zuul.filters.Route;
+import org.springframework.cloud.netflix.zuul.filters.RouteLocator;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.ServletServerHttpRequest;
+import org.springframework.stereotype.Component;
+import org.springframework.web.util.UriComponents;
+import org.springframework.web.util.UriComponentsBuilder;
+import org.springframework.web.util.UrlPathHelper;
+
+import java.net.URI;
+
+import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.*;
+
+/**
+ * Handle http redirection.
+ */
+@Component
+public class LocationHeaderRewritingFilter extends ZuulFilter {
+
+    private final UrlPathHelper urlPathHelper = new UrlPathHelper();
+
+    @Autowired
+    private RouteLocator routeLocator;
+
+    @Value("${redirect.host}")
+    private String host;
+
+    @Value("${redirect.port}")
+    private String port;
+
+    @Override
+    public String filterType() {
+        return POST_TYPE;
+    }
+
+    @Override
+    public int filterOrder() {
+        return SEND_RESPONSE_FILTER_ORDER - 100;
+    }
+
+    private static final String LOCATION_HEADER = "Location";
+
+    @Override
+    public boolean shouldFilter() {
+        RequestContext ctx = RequestContext.getCurrentContext();
+        int statusCode = ctx.getResponseStatusCode();
+        return HttpStatus.valueOf(statusCode).is3xxRedirection();
+    }
+
+    @Override
+    public Object run() {
+        RequestContext ctx = RequestContext.getCurrentContext();
+
+        Route route = routeLocator.getMatchingRoute(
+                urlPathHelper.getPathWithinApplication(ctx.getRequest()));
+        if (route != null) {
+            Pair<String, String> lh = locationHeader(ctx);
+            if (lh != null) {
+                String location = lh.second();
+                URI originalRequestUri = UriComponentsBuilder
+                        .fromHttpRequest(new ServletServerHttpRequest(ctx.getRequest()))
+                        .build().toUri();
+                UriComponentsBuilder redirectedUriBuilder = UriComponentsBuilder
+                        .fromUriString(location);
+
+                UriComponents redirectedUriComps = redirectedUriBuilder.build();
+
+                String modifiedLocation = redirectedUriBuilder
+                        .scheme(originalRequestUri.getScheme())
+                        .host(host)
+                        .port(port).replacePath(redirectedUriComps.getPath())
+                        .queryParam("redirect", true)
+                        .queryParam("redirect.scheme", redirectedUriComps.getScheme())
+                        .queryParam("redirect.host", redirectedUriComps.getHost())
+                        .queryParam("redirect.port", redirectedUriComps.getPort())
+                        .toUriString();
+                lh.setSecond(modifiedLocation);
+            }
+        }
+        return null;
+    }
+
+    private Pair<String, String> locationHeader(RequestContext ctx) {
+        if (ctx.getZuulResponseHeaders() != null) {
+            for (Pair<String, String> pair : ctx.getZuulResponseHeaders()) {
+                if (pair.first().equals(LOCATION_HEADER)) {
+                    return pair;
+                }
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/src/main/resources/META-INF/sql/schema.sql b/src/main/resources/META-INF/sql/schema.sql
index 3ecac16..46c5ea8 100644
--- a/src/main/resources/META-INF/sql/schema.sql
+++ b/src/main/resources/META-INF/sql/schema.sql
@@ -99,3 +99,10 @@
   CONSTRAINT FK_namespace FOREIGN KEY (namespace) References namespaces(namespace),
   CONSTRAINT PK_bundle PRIMARY KEY (broker, tenant, namespace, bundle)
 );
+
+CREATE TABLE IF NOT EXISTS environments (
+  name varchar(256) NOT NULL,
+  broker varchar(1024) NOT NULL,
+  CONSTRAINT PK_name PRIMARY KEY (name),
+  UNIQUE (broker)
+)
\ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 71045ca..fb5a371 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -65,4 +65,7 @@
 
 # bookie
 bookie.host=http://localhost:8050
-bookie.enable=false
\ No newline at end of file
+bookie.enable=false
+
+redirect.host=localhost
+redirect.port=9527
\ No newline at end of file
diff --git a/src/test/java/com/manager/pulsar/dao/EnvironmentsRepositoryImplTest.java b/src/test/java/com/manager/pulsar/dao/EnvironmentsRepositoryImplTest.java
new file mode 100644
index 0000000..d6432ba
--- /dev/null
+++ b/src/test/java/com/manager/pulsar/dao/EnvironmentsRepositoryImplTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.manager.pulsar.dao;
+
+import com.github.pagehelper.Page;
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Optional;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class EnvironmentsRepositoryImplTest {
+
+    @Autowired
+    private EnvironmentsRepository environmentsRepository;
+
+    @Test
+    public void getEnvironmentsList() {
+        EnvironmentEntity environmentEntity = new EnvironmentEntity();
+        environmentEntity.setName("test-environment");
+        environmentEntity.setBroker("http://localhost:8080");
+        environmentsRepository.save(environmentEntity);
+        Page<EnvironmentEntity> environmentEntityPage = environmentsRepository.getEnvironmentsList(1, 1);
+        environmentEntityPage.count(true);
+        environmentEntityPage.getResult().forEach((result) -> {
+            Assert.assertEquals(result.getName(), "test-environment");
+            Assert.assertEquals(result.getBroker(), "http://localhost:8080");
+            environmentsRepository.remove(result.getName());
+        });
+    }
+
+    @Test
+    public void getAndUpdateEnvironmentByName() {
+        EnvironmentEntity environmentEntity = new EnvironmentEntity();
+        environmentEntity.setName("test-environment");
+        environmentEntity.setBroker("https://localhost:8080");
+        environmentsRepository.save(environmentEntity);
+        Optional<EnvironmentEntity> environmentEntityOptionalGet = environmentsRepository
+                .findByBroker("https://localhost:8080");
+        EnvironmentEntity environmentEntityGet = environmentEntityOptionalGet.get();
+        Assert.assertEquals(environmentEntityGet.getName(), "test-environment");
+        Assert.assertEquals(environmentEntityGet.getBroker(), "https://localhost:8080");
+
+        environmentEntity.setBroker("https://localhost:8081");
+        environmentsRepository.update(environmentEntity);
+        Optional<EnvironmentEntity> environmentEntityOptionalUpdate = environmentsRepository
+                .findByName("test-environment");
+        EnvironmentEntity environmentEntityUpdate = environmentEntityOptionalUpdate.get();
+        Assert.assertEquals(environmentEntityUpdate.getName(), "test-environment");
+        Assert.assertEquals(environmentEntityUpdate.getBroker(), "https://localhost:8081");
+
+        environmentsRepository.remove(environmentEntityUpdate.getName());
+    }
+
+
+}
diff --git a/src/test/java/com/manager/pulsar/service/BrokersServiceImplTest.java b/src/test/java/com/manager/pulsar/service/BrokersServiceImplTest.java
index 4f7e487..a63cf59 100644
--- a/src/test/java/com/manager/pulsar/service/BrokersServiceImplTest.java
+++ b/src/test/java/com/manager/pulsar/service/BrokersServiceImplTest.java
@@ -15,6 +15,9 @@
 
 
 import com.google.common.collect.Maps;
+import com.manager.pulsar.entity.EnvironmentEntity;
+import com.manager.pulsar.entity.EnvironmentsRepository;
+import com.manager.pulsar.utils.EnvironmentTools;
 import com.manager.pulsar.utils.HttpUtil;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,9 +30,13 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
 
+import javax.servlet.http.HttpServletRequest;
 import java.util.Map;
 
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(SpringRunner.class)
 @PowerMockIgnore( {"javax.management.*", "javax.net.ssl.*"})
@@ -41,16 +48,20 @@
     private BrokersService brokersService;
 
     @Test
-    public void brokersServiceTest() {
+    public void brokersServiceTest() throws Exception{
         PowerMockito.mockStatic(HttpUtil.class);
         Map<String, String> header = Maps.newHashMap();
         header.put("Content-Type", "application/json");
         PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/clusters/standalone/failureDomains", header))
                 .thenReturn("{\"test\":{\"brokers\":[\"tengdeMBP:8080\"]}}");
 
+        EnvironmentEntity environmentEntity = new EnvironmentEntity();
+        environmentEntity.setName("test-environment");
+        environmentEntity.setBroker("http://localhost:8080");
         PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
                 .thenReturn("[\"tengdeMBP:8080\"]");
-        Map<String, Object> result = brokersService.getBrokersList(1, 1, "standalone");
+        Map<String, Object> result = brokersService.getBrokersList(
+                1, 1, "standalone", "http://localhost:8080");
         Assert.assertEquals(result.get("total"), 1);
         Assert.assertEquals(result.get("data").toString(), "[{failureDomain=[test], broker=tengdeMBP:8080}]");
         Assert.assertEquals(result.get("pageSize"), 1);
diff --git a/src/test/java/com/manager/pulsar/service/ClustersServiceImplTest.java b/src/test/java/com/manager/pulsar/service/ClustersServiceImplTest.java
index 3657597..72799e5 100644
--- a/src/test/java/com/manager/pulsar/service/ClustersServiceImplTest.java
+++ b/src/test/java/com/manager/pulsar/service/ClustersServiceImplTest.java
@@ -57,7 +57,7 @@
 
         PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header))
                 .thenReturn("[\"tengdeMBP:8080\"]");
-        Map<String, Object> result = clustersService.getClustersList(1, 1);
+        Map<String, Object> result = clustersService.getClustersList(1, 1, "http://localhost:8080");
         Assert.assertEquals(result.get("data").toString(),
                 "[{cluster=standalone, serviceUrlTls=null, brokers=1, serviceUrl=http://tengdeMBP:8080, " +
                         "brokerServiceUrlTls=null, brokerServiceUrl=pulsar://tengdeMBP:6650}]");
diff --git a/src/test/java/com/manager/pulsar/service/NamespacesServiceImplTest.java b/src/test/java/com/manager/pulsar/service/NamespacesServiceImplTest.java
index 55d6634..c660cf1 100644
--- a/src/test/java/com/manager/pulsar/service/NamespacesServiceImplTest.java
+++ b/src/test/java/com/manager/pulsar/service/NamespacesServiceImplTest.java
@@ -52,7 +52,7 @@
         PowerMockito.when(HttpUtil.doGet(
                 "http://localhost:8080/admin/v2/persistent/public/default/partitioned", header))
                 .thenReturn("[]");
-        Map<String, Object> result = namespacesService.getNamespaceList(1, 1, "public");
+        Map<String, Object> result = namespacesService.getNamespaceList(1, 1, "public", "http://localhost:8080");
         Assert.assertEquals(result.get("total"), 1);
         Assert.assertFalse((Boolean) result.get("isPage"));
         Assert.assertEquals(result.get("data").toString(), "[{topics=1, namespace=default}]");
diff --git a/src/test/java/com/manager/pulsar/service/TenantsServiceImplTest.java b/src/test/java/com/manager/pulsar/service/TenantsServiceImplTest.java
index ec715d5..8f066a4 100644
--- a/src/test/java/com/manager/pulsar/service/TenantsServiceImplTest.java
+++ b/src/test/java/com/manager/pulsar/service/TenantsServiceImplTest.java
@@ -52,7 +52,7 @@
                 .thenReturn("{\"adminRoles\": [\"admin\"], \"allowedClusters\": [\"standalone\"]}");
         PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/namespaces/public", header))
                 .thenReturn("[\"public/default\"]");
-        Map<String, Object> objectMap = tenantsService.getTenantsList(1, 2);
+        Map<String, Object> objectMap = tenantsService.getTenantsList(1, 2, "http://localhost:8080");
         Assert.assertEquals(objectMap.get("total"), 1);
         Assert.assertEquals(objectMap.get("pageSize"), 1);
         Assert.assertEquals(objectMap.get("pageNum"), 1);
diff --git a/src/test/java/com/manager/pulsar/service/TopicsServiceImplTest.java b/src/test/java/com/manager/pulsar/service/TopicsServiceImplTest.java
index 1701e99..dfe7fd0 100644
--- a/src/test/java/com/manager/pulsar/service/TopicsServiceImplTest.java
+++ b/src/test/java/com/manager/pulsar/service/TopicsServiceImplTest.java
@@ -61,7 +61,7 @@
                 "http://localhost:8080/admin/v2/persistent/public/default/test900/partitions", header))
                 .thenReturn("{\"partitions\":3}");
         Map<String, Object> topicsMap = topicsService.getTopicsList(
-                1, 1, "public", "default");
+                1, 1, "public", "default", "http://localhost:8080");
         Assert.assertEquals(topicsMap.get("total"), 2);
         Assert.assertFalse((Boolean) topicsMap.get("isPage"));
         Assert.assertEquals(topicsMap.get("topics").toString(),