blob: ce9f4c731cd2d5eea559e31dc4438a334a4622c6 [file] [log] [blame]
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
include Java
java_import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil
java_import org.apache.hadoop.hbase.replication.SyncReplicationState
java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
java_import org.apache.hadoop.hbase.util.Bytes
java_import org.apache.hadoop.hbase.zookeeper.ZKConfig
java_import org.apache.hadoop.hbase.TableName
# Used for replication administrative operations.
module Hbase
class RepAdmin
include HBaseConstants
def initialize(configuration)
@configuration = configuration
@admin = ConnectionFactory.createConnection(configuration).getAdmin
end
#----------------------------------------------------------------------------------------------
# Add a new peer cluster to replicate to
def add_peer(id, args = {}, peer_tableCFs = nil)
if args.is_a?(Hash)
unless peer_tableCFs.nil?
raise(ArgumentError, 'peer_tableCFs should be specified as TABLE_CFS in args')
end
endpoint_classname = args.fetch(ENDPOINT_CLASSNAME, nil)
cluster_key = args.fetch(CLUSTER_KEY, nil)
# Handle cases where custom replication endpoint and cluster key are either both provided
# or neither are provided
if endpoint_classname.nil? && cluster_key.nil?
raise(ArgumentError, 'Either ENDPOINT_CLASSNAME or CLUSTER_KEY must be specified.')
end
# Cluster Key is required for ReplicationPeerConfig for a custom replication endpoint
if !endpoint_classname.nil? && cluster_key.nil?
cluster_key = ZKConfig.getZooKeeperClusterKey(@configuration)
end
# Optional parameters
config = args.fetch(CONFIG, nil)
data = args.fetch(DATA, nil)
table_cfs = args.fetch(TABLE_CFS, nil)
namespaces = args.fetch(NAMESPACES, nil)
peer_state = args.fetch(STATE, nil)
remote_wal_dir = args.fetch(REMOTE_WAL_DIR, nil)
serial = args.fetch(SERIAL, nil)
# Create and populate a ReplicationPeerConfig
builder = ReplicationPeerConfig.newBuilder()
builder.set_cluster_key(cluster_key)
unless endpoint_classname.nil?
builder.set_replication_endpoint_impl(endpoint_classname)
end
unless remote_wal_dir.nil?
builder.setRemoteWALDir(remote_wal_dir)
end
unless serial.nil?
builder.setSerial(serial)
end
unless config.nil?
builder.putAllConfiguration(config)
end
unless data.nil?
# Convert Strings to Bytes for peer_data
data.each do |key, val|
builder.putPeerData(Bytes.to_bytes(key), Bytes.to_bytes(val))
end
end
unless namespaces.nil?
ns_set = java.util.HashSet.new
namespaces.each do |n|
ns_set.add(n)
end
builder.setReplicateAllUserTables(false)
builder.set_namespaces(ns_set)
end
unless table_cfs.nil?
# convert table_cfs to TableName
map = java.util.HashMap.new
table_cfs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
builder.setReplicateAllUserTables(false)
builder.set_table_cfs_map(map)
end
enabled = true
unless peer_state.nil?
enabled = false if peer_state == 'DISABLED'
end
@admin.addReplicationPeer(id, builder.build, enabled)
else
raise(ArgumentError, 'args must be a Hash')
end
end
#----------------------------------------------------------------------------------------------
# Remove a peer cluster, stops the replication
def remove_peer(id)
@admin.removeReplicationPeer(id)
end
#---------------------------------------------------------------------------------------------
# Show replcated tables/column families, and their ReplicationType
def list_replicated_tables(regex = '.*')
pattern = java.util.regex.Pattern.compile(regex)
list = @admin.listReplicatedTableCFs
list.select { |t| pattern.match(t.getTable.getNameAsString) }
end
#----------------------------------------------------------------------------------------------
# List all peer clusters
def list_peers
@admin.listReplicationPeers
end
#----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer
def enable_peer(id)
@admin.enableReplicationPeer(id)
end
#----------------------------------------------------------------------------------------------
# Stop the replication stream to the specified peer
def disable_peer(id)
@admin.disableReplicationPeer(id)
end
#----------------------------------------------------------------------------------------------
# Show the current tableCFs config for the specified peer
def show_peer_tableCFs(id)
rpc = @admin.getReplicationPeerConfig(id)
show_peer_tableCFs_by_config(rpc)
end
def show_peer_tableCFs_by_config(peer_config)
ReplicationPeerConfigUtil.convertToString(peer_config.getTableCFsMap)
end
#----------------------------------------------------------------------------------------------
# Set new tableCFs config for the specified peer
def set_peer_tableCFs(id, tableCFs)
unless tableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
tableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
rpc = get_peer_config(id)
unless rpc.nil?
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setTableCFsMap(map)
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end
#----------------------------------------------------------------------------------------------
# Append a tableCFs config for the specified peer
def append_peer_tableCFs(id, tableCFs)
unless tableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
tableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
end
@admin.appendReplicationPeerTableCFs(id, map)
end
#----------------------------------------------------------------------------------------------
# Remove some tableCFs from the tableCFs config of the specified peer
def remove_peer_tableCFs(id, tableCFs)
unless tableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
tableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
end
@admin.removeReplicationPeerTableCFs(id, map)
end
# Append exclude-tableCFs to the exclude-tableCFs config for the specified peer
def append_peer_exclude_tableCFs(id, excludeTableCFs)
unless excludeTableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
excludeTableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
rpc = get_peer_config(id)
unless rpc.nil?
rpc = ReplicationPeerConfigUtil.appendExcludeTableCFsToReplicationPeerConfig(map, rpc)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
end
# Remove some exclude-tableCFs from the exclude-tableCFs config for the specified peer
def remove_peer_exclude_tableCFs(id, excludeTableCFs)
unless excludeTableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
excludeTableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
rpc = get_peer_config(id)
unless rpc.nil?
rpc = ReplicationPeerConfigUtil.removeExcludeTableCFsFromReplicationPeerConfig(map, rpc, id)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
end
# Set new namespaces config for the specified peer
def set_peer_namespaces(id, namespaces)
unless namespaces.nil?
ns_set = java.util.HashSet.new
namespaces.each do |n|
ns_set.add(n)
end
rpc = get_peer_config(id)
unless rpc.nil?
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end
# Add some namespaces for the specified peer
def add_peer_namespaces(id, namespaces)
unless namespaces.nil?
rpc = get_peer_config(id)
unless rpc.nil?
if rpc.getNamespaces.nil?
ns_set = java.util.HashSet.new
else
ns_set = java.util.HashSet.new(rpc.getNamespaces)
end
namespaces.each do |n|
ns_set.add(n)
end
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end
# Remove some namespaces for the specified peer
def remove_peer_namespaces(id, namespaces)
unless namespaces.nil?
rpc = get_peer_config(id)
unless rpc.nil?
ns_set = rpc.getNamespaces
unless ns_set.nil?
ns_set = java.util.HashSet.new(ns_set)
namespaces.each do |n|
ns_set.remove(n)
end
end
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end
# Show the current namespaces config for the specified peer
def show_peer_namespaces(peer_config)
namespaces = peer_config.get_namespaces
if !namespaces.nil?
namespaces = java.util.ArrayList.new(namespaces)
java.util.Collections.sort(namespaces)
return namespaces.join(';')
else
return nil
end
end
# Set new bandwidth config for the specified peer
def set_peer_bandwidth(id, bandwidth)
rpc = get_peer_config(id)
return if rpc.nil?
rpc = ReplicationPeerConfig.newBuilder(rpc).setBandwidth(bandwidth).build
@admin.updateReplicationPeerConfig(id, rpc)
end
# Append exclude namespaces config for the specified peer
def append_peer_exclude_namespaces(id, namespaces)
unless namespaces.nil?
rpc = get_peer_config(id)
unless rpc.nil?
if rpc.getExcludeNamespaces.nil?
ns_set = java.util.HashSet.new
else
ns_set = java.util.HashSet.new(rpc.getExcludeNamespaces)
end
namespaces.each do |n|
ns_set.add(n)
end
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setExcludeNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end
# Remove exclude namespaces config for the specified peer
def remove_peer_exclude_namespaces(id, namespaces)
unless namespaces.nil?
rpc = get_peer_config(id)
unless rpc.nil?
ns_set = rpc.getExcludeNamespaces
unless ns_set.nil?
ns_set = java.util.HashSet.new(ns_set)
namespaces.each do |n|
ns_set.remove(n)
end
end
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setExcludeNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end
def set_peer_replicate_all(id, replicate_all)
rpc = get_peer_config(id)
return if rpc.nil?
rpc = ReplicationPeerConfig.newBuilder(rpc).setReplicateAllUserTables(replicate_all).build
@admin.updateReplicationPeerConfig(id, rpc)
end
def set_peer_serial(id, peer_serial)
rpc = get_peer_config(id)
return if rpc.nil?
rpc_builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig
.newBuilder(rpc)
new_rpc = rpc_builder.setSerial(peer_serial).build
@admin.updateReplicationPeerConfig(id, new_rpc)
end
# Set exclude namespaces config for the specified peer
def set_peer_exclude_namespaces(id, exclude_namespaces)
return if exclude_namespaces.nil?
exclude_ns_set = java.util.HashSet.new
exclude_namespaces.each do |n|
exclude_ns_set.add(n)
end
rpc = get_peer_config(id)
return if rpc.nil?
rpc = ReplicationPeerConfig.newBuilder(rpc).setExcludeNamespaces(exclude_ns_set).build
@admin.updateReplicationPeerConfig(id, rpc)
end
# Show the exclude namespaces config for the specified peer
def show_peer_exclude_namespaces(peer_config)
namespaces = peer_config.getExcludeNamespaces
return nil if namespaces.nil?
namespaces = java.util.ArrayList.new(namespaces)
java.util.Collections.sort(namespaces)
'!' + namespaces.join(';')
end
# Set exclude tableCFs config for the specified peer
def set_peer_exclude_tableCFs(id, exclude_tableCFs)
return if exclude_tableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
exclude_tableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
rpc = get_peer_config(id)
return if rpc.nil?
rpc = ReplicationPeerConfig.newBuilder(rpc).setExcludeTableCFsMap(map).build
@admin.updateReplicationPeerConfig(id, rpc)
end
# Show the exclude tableCFs config for the specified peer
def show_peer_exclude_tableCFs(peer_config)
tableCFs = peer_config.getExcludeTableCFsMap
return nil if tableCFs.nil?
'!' + ReplicationPeerConfigUtil.convertToString(tableCFs)
end
# Transit current cluster to a new state in the specified synchronous
# replication peer
def transit_peer_sync_replication_state(id, state)
if 'ACTIVE'.eql?(state)
@admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::ACTIVE)
elsif 'DOWNGRADE_ACTIVE'.eql?(state)
@admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::DOWNGRADE_ACTIVE)
elsif 'STANDBY'.eql?(state)
@admin.transitReplicationPeerSyncReplicationState(id, SyncReplicationState::STANDBY)
else
raise(ArgumentError, 'synchronous replication state must be ACTIVE, DOWNGRADE_ACTIVE or STANDBY')
end
end
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@admin.enableTableReplication(tableName)
end
#----------------------------------------------------------------------------------------------
# Disables a table's replication switch
def disable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@admin.disableTableReplication(tableName)
end
def list_peer_configs
map = java.util.HashMap.new
peers = @admin.listReplicationPeers
peers.each do |peer|
map.put(peer.getPeerId, peer.getPeerConfig)
end
map
end
def get_peer_config(id)
@admin.getReplicationPeerConfig(id)
end
def update_peer_config(id, args = {})
# Optional parameters
config = args.fetch(CONFIG, nil)
data = args.fetch(DATA, nil)
# Create and populate a ReplicationPeerConfig
replication_peer_config = get_peer_config(id)
builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig
.newBuilder(replication_peer_config)
unless config.nil?
builder.putAllConfiguration(config)
end
unless data.nil?
# Convert Strings to Bytes for peer_data
data.each do |key, val|
builder.putPeerData(Bytes.to_bytes(key), Bytes.to_bytes(val))
end
end
@admin.updateReplicationPeerConfig(id, builder.build)
end
end
end