| # |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| include Java |
| java_import java.util.Arrays |
| java_import java.util.regex.Pattern |
| java_import org.apache.hadoop.hbase.util.Pair |
| java_import org.apache.hadoop.hbase.util.RegionSplitter |
| java_import org.apache.hadoop.hbase.util.Bytes |
| java_import org.apache.hadoop.hbase.ServerName |
| java_import org.apache.hadoop.hbase.TableName |
| java_import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder |
| java_import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder |
| java_import org.apache.hadoop.hbase.client.TableDescriptorBuilder |
| java_import org.apache.hadoop.hbase.HConstants |
| |
| # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin |
| |
| module Hbase |
| # rubocop:disable Metrics/ClassLength |
| class Admin |
| include HBaseConstants |
| |
| def initialize(connection) |
| @connection = connection |
| # Java Admin instance |
| @admin = @connection.getAdmin |
| @hbck = @connection.getHbck |
| @conf = @connection.getConfiguration |
| end |
| |
| def close |
| @admin.close |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of tables in hbase |
| def list(regex = '.*') |
| @admin.listTableNames(Pattern.compile(regex)).map(&:getNameAsString) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests a table or region or region server flush |
| def flush(name, family = nil) |
| family_bytes = nil |
| family_bytes = family.to_java_bytes unless family.nil? |
| if family_bytes.nil? |
| @admin.flushRegion(name.to_java_bytes) |
| else |
| @admin.flushRegion(name.to_java_bytes, family_bytes) |
| end |
| rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException |
| # Unknown region. Try table. |
| begin |
| if family_bytes.nil? |
| @admin.flush(TableName.valueOf(name)) |
| else |
| @admin.flush(TableName.valueOf(name), family_bytes) |
| end |
| rescue java.lang.IllegalArgumentException |
| # Unknown table. Try region server. |
| @admin.flushRegionServer(ServerName.valueOf(name)) |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests a table or region or column family compaction |
| def compact(table_or_region_name, family = nil, type = 'NORMAL') |
| family_bytes = nil |
| family_bytes = family.to_java_bytes unless family.nil? |
| compact_type = nil |
| if type == 'NORMAL' |
| compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL |
| elsif type == 'MOB' |
| compact_type = org.apache.hadoop.hbase.client.CompactType::MOB |
| else |
| raise ArgumentError, 'only NORMAL or MOB accepted for type!' |
| end |
| |
| begin |
| if family_bytes.nil? |
| @admin.compactRegion(table_or_region_name.to_java_bytes) |
| else |
| @admin.compactRegion(table_or_region_name.to_java_bytes, family_bytes) |
| end |
| rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException |
| if family_bytes.nil? |
| @admin.compact(TableName.valueOf(table_or_region_name), compact_type) |
| else |
| @admin.compact(TableName.valueOf(table_or_region_name), family_bytes, compact_type) |
| end |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Switch compaction on/off at runtime on a region server |
| def compaction_switch(on_or_off, regionserver_names) |
| region_servers = regionserver_names.flatten.compact |
| servers = java.util.ArrayList.new |
| if region_servers.any? |
| region_servers.each do |s| |
| servers.add(s) |
| end |
| end |
| @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Gets compaction state for specified table |
| def getCompactionState(table_name) |
| @admin.getCompactionState(TableName.valueOf(table_name)).name |
| end |
| |
| # Requests to compact all regions on the regionserver |
| def compact_regionserver(servername, major = false) |
| if major |
| @admin.majorCompactRegionServer(ServerName.valueOf(servername)) |
| else |
| @admin.compactRegionServer(ServerName.valueOf(servername)) |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests a table or region or column family major compaction |
| def major_compact(table_or_region_name, family = nil, type = 'NORMAL') |
| family_bytes = nil |
| family_bytes = family.to_java_bytes unless family.nil? |
| compact_type = nil |
| if type == 'NORMAL' |
| compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL |
| elsif type == 'MOB' |
| compact_type = org.apache.hadoop.hbase.client.CompactType::MOB |
| else |
| raise ArgumentError, 'only NORMAL or MOB accepted for type!' |
| end |
| |
| begin |
| if family_bytes.nil? |
| @admin.majorCompactRegion(table_or_region_name.to_java_bytes) |
| else |
| @admin.majorCompactRegion(table_or_region_name.to_java_bytes, family_bytes) |
| end |
| rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException |
| if family_bytes.nil? |
| @admin.majorCompact(TableName.valueOf(table_or_region_name), compact_type) |
| else |
| @admin.majorCompact(TableName.valueOf(table_or_region_name), family_bytes, compact_type) |
| end |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests a regionserver's WAL roll |
| def wal_roll(server_name) |
| @admin.rollWALWriter(ServerName.valueOf(server_name)) |
| end |
| # TODO: remove older hlog_roll version |
| alias hlog_roll wal_roll |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests a table or region split |
| def split(table_or_region_name, split_point = nil) |
| split_point_bytes = nil |
| split_point_bytes = split_point.to_java_bytes unless split_point.nil? |
| begin |
| if split_point_bytes.nil? |
| org.apache.hadoop.hbase.util.FutureUtils.get(@admin.splitRegionAsync(table_or_region_name.to_java_bytes)) |
| else |
| org.apache.hadoop.hbase.util.FutureUtils.get(@admin.splitRegionAsync(table_or_region_name.to_java_bytes, split_point_bytes)) |
| end |
| rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException |
| if split_point_bytes.nil? |
| @admin.split(TableName.valueOf(table_or_region_name)) |
| else |
| @admin.split(TableName.valueOf(table_or_region_name), split_point_bytes) |
| end |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enable/disable one split or merge switch |
| # Returns previous switch setting. |
| def splitormerge_switch(type, enabled) |
| if type == 'SPLIT' |
| @admin.splitSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false)) |
| elsif type == 'MERGE' |
| @admin.mergeSwitch(java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false)) |
| else |
| raise ArgumentError, 'only SPLIT or MERGE accepted for type!' |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query the current state of the split or merge switch. |
| # Returns the switch's state (true is enabled). |
| def splitormerge_enabled(type) |
| if type == 'SPLIT' |
| @admin.isSplitEnabled |
| elsif type == 'MERGE' |
| @admin.isMergeEnabled |
| else |
| raise ArgumentError, 'only SPLIT or MERGE accepted for type!' |
| end |
| end |
| |
| def locate_region(table_name, row_key) |
| locator = @connection.getRegionLocator(TableName.valueOf(table_name)) |
| begin |
| return locator.getRegionLocation(Bytes.toBytesBinary(row_key)) |
| ensure |
| locator.close |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests a cluster balance |
| # Returns true if balancer ran |
| def balancer(force) |
| @admin.balance(java.lang.Boolean.valueOf(force)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enable/disable balancer |
| # Returns previous balancer switch setting. |
| def balance_switch(enableDisable) |
| @admin.balancerSwitch( |
| java.lang.Boolean.valueOf(enableDisable), java.lang.Boolean.valueOf(false) |
| ) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query the current state of the LoadBalancer. |
| # Returns the balancer's state (true is enabled). |
| def balancer_enabled? |
| @admin.isBalancerEnabled |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests clear block cache for table |
| def clear_block_cache(table_name) |
| @admin.clearBlockCache(org.apache.hadoop.hbase.TableName.valueOf(table_name)).toString |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Requests region normalization for all configured tables in the cluster |
| # Returns true if normalize request was successfully submitted |
| def normalize(*args) |
| builder = org.apache.hadoop.hbase.client.NormalizeTableFilterParams::Builder.new |
| args.each do |arg| |
| unless arg.is_a?(String) || arg.is_a?(Hash) |
| raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type") |
| end |
| |
| if arg.key?(TABLE_NAME) |
| table_name = arg.delete(TABLE_NAME) |
| unless table_name.is_a?(String) |
| raise(ArgumentError, "#{TABLE_NAME} must be of type String") |
| end |
| |
| builder.tableNames(java.util.Collections.singletonList(TableName.valueOf(table_name))) |
| elsif arg.key?(TABLE_NAMES) |
| table_names = arg.delete(TABLE_NAMES) |
| unless table_names.is_a?(Array) |
| raise(ArgumentError, "#{TABLE_NAMES} must be of type Array") |
| end |
| |
| table_name_list = java.util.LinkedList.new |
| table_names.each do |tn| |
| unless tn.is_a?(String) |
| raise(ArgumentError, "#{TABLE_NAMES} value #{tn} must be of type String") |
| end |
| |
| table_name_list.add(TableName.valueOf(tn)) |
| end |
| builder.tableNames(table_name_list) |
| elsif arg.key?(REGEX) |
| regex = arg.delete(REGEX) |
| raise(ArgumentError, "#{REGEX} must be of type String") unless regex.is_a?(String) |
| |
| builder.regex(regex) |
| elsif arg.key?(NAMESPACE) |
| namespace = arg.delete(NAMESPACE) |
| unless namespace.is_a?(String) |
| raise(ArgumentError, "#{NAMESPACE} must be of type String") |
| end |
| |
| builder.namespace(namespace) |
| else |
| raise(ArgumentError, "Unrecognized argument #{arg}") |
| end |
| end |
| ntfp = builder.build |
| @admin.normalize(ntfp) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enable/disable region normalizer |
| # Returns previous normalizer switch setting. |
| def normalizer_switch(enableDisable) |
| @admin.normalizerSwitch(java.lang.Boolean.valueOf(enableDisable)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query the current state of region normalizer. |
| # Returns the state of region normalizer (true is enabled). |
| def normalizer_enabled? |
| @admin.isNormalizerEnabled |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query the current state of master in maintenance mode. |
| # Returns the state of maintenance mode (true is on). |
| def in_maintenance_mode? |
| @admin.isMasterInMaintenanceMode |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Request HBCK chore to run |
| def hbck_chore_run |
| @hbck.runHbckChore |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Request a scan of the catalog table (for garbage collection) |
| # Returns an int signifying the number of entries cleaned |
| def catalogjanitor_run |
| @admin.runCatalogJanitor |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enable/disable the catalog janitor |
| # Returns previous catalog janitor switch setting. |
| def catalogjanitor_switch(enableDisable) |
| @admin.catalogJanitorSwitch(java.lang.Boolean.valueOf(enableDisable)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query on the catalog janitor state (enabled/disabled?) |
| # Returns catalog janitor state (true signifies enabled). |
| def catalogjanitor_enabled |
| @admin.isCatalogJanitorEnabled |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Request cleaner chore to run (for garbage collection of HFiles and WAL files) |
| def cleaner_chore_run |
| @admin.runCleanerChore |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enable/disable the cleaner chore |
| # Returns previous cleaner switch setting. |
| def cleaner_chore_switch(enableDisable) |
| @admin.cleanerChoreSwitch(java.lang.Boolean.valueOf(enableDisable)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query on the cleaner chore state (enabled/disabled?) |
| # Returns cleaner state (true signifies enabled). |
| def cleaner_chore_enabled |
| @admin.isCleanerChoreEnabled |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enables a table |
| def enable(table_name) |
| tableExists(table_name) |
| return if enabled?(table_name) |
| @admin.enableTable(TableName.valueOf(table_name)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enables all tables matching the given regex |
| def enable_all(regex) |
| pattern = Pattern.compile(regex.to_s) |
| failed = java.util.ArrayList.new |
| @admin.listTableNames(pattern).each do |table_name| |
| begin |
| @admin.enableTable(table_name) |
| rescue java.io.IOException => e |
| puts "table:#{table_name}, error:#{e.toString}" |
| failed.add(table_name) |
| end |
| end |
| failed |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Disables a table |
| def disable(table_name) |
| tableExists(table_name) |
| return if disabled?(table_name) |
| @admin.disableTable(TableName.valueOf(table_name)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Disables all tables matching the given regex |
| def disable_all(regex) |
| pattern = Pattern.compile(regex.to_s) |
| failed = java.util.ArrayList.new |
| @admin.listTableNames(pattern).each do |table_name| |
| begin |
| @admin.disableTable(table_name) |
| rescue java.io.IOException => e |
| puts "table:#{table_name}, error:#{e.toString}" |
| failed.add(table_name) |
| end |
| end |
| failed |
| end |
| |
| #--------------------------------------------------------------------------------------------- |
| # Throw exception if table doesn't exist |
| def tableExists(table_name) |
| raise ArgumentError, "Table #{table_name} does not exist." unless exists?(table_name) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Is table disabled? |
| def disabled?(table_name) |
| @admin.isTableDisabled(TableName.valueOf(table_name)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Drops a table |
| def drop(table_name) |
| tableExists(table_name) |
| raise ArgumentError, "Table #{table_name} is enabled. Disable it first." if enabled?( |
| table_name |
| ) |
| |
| @admin.deleteTable(org.apache.hadoop.hbase.TableName.valueOf(table_name)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Drops a table |
| def drop_all(regex) |
| pattern = Pattern.compile(regex.to_s) |
| failed = java.util.ArrayList.new |
| @admin.listTableNames(pattern).each do |table_name| |
| begin |
| @admin.deleteTable(table_name) |
| rescue java.io.IOException => e |
| puts puts "table:#{table_name}, error:#{e.toString}" |
| failed.add(table_name) |
| end |
| end |
| failed |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns ZooKeeper status dump |
| def zk_dump |
| @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZKWatcher.new( |
| @admin.getConfiguration, |
| 'admin', |
| nil |
| ) |
| zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper |
| @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk) |
| org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Creates a table |
| def create(table_name, *args) |
| # Fail if table name is not a string |
| raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String) |
| |
| # Flatten params array |
| args = args.flatten.compact |
| has_columns = false |
| |
| # Start defining the table |
| tdb = TableDescriptorBuilder.newBuilder(TableName.valueOf(table_name)) |
| splits = nil |
| # Args are either columns or splits, add them to the table definition |
| # TODO: add table options support |
| args.each do |arg| |
| unless arg.is_a?(String) || arg.is_a?(Hash) |
| raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type") |
| end |
| |
| # First, handle all the cases where arg is a column family. |
| if arg.is_a?(String) || arg.key?(NAME) |
| # If the arg is a string, default action is to add a column to the table. |
| # If arg has a name, it must also be a column descriptor. |
| descriptor = cfd(arg, tdb) |
| # Warn if duplicate columns are added |
| if tdb.build.hasColumnFamily(descriptor.getName) |
| puts "Family '" + descriptor.getNameAsString + "' already exists, the old one will be replaced" |
| tdb.modifyColumnFamily(descriptor) |
| else |
| tdb.setColumnFamily(descriptor) |
| end |
| has_columns = true |
| next |
| end |
| if arg.key?(REGION_REPLICATION) |
| region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION)) |
| tdb.setRegionReplication(region_replication) |
| end |
| |
| # Get rid of the "METHOD", which is deprecated for create. |
| # We'll do whatever it used to do below if it's table_att. |
| if (method = arg.delete(METHOD)) |
| raise(ArgumentError, 'table_att is currently the only supported method') unless method == 'table_att' |
| end |
| |
| # The hash is not a column family. Figure out what's in it. |
| # First, handle splits. |
| if arg.key?(SPLITS_FILE) |
| splits_file = arg.delete(SPLITS_FILE) |
| unless File.exist?(splits_file) |
| raise(ArgumentError, "Splits file #{splits_file} doesn't exist") |
| end |
| arg[SPLITS] = [] |
| File.foreach(splits_file) do |line| |
| arg[SPLITS].push(line.chomp) |
| end |
| tdb.setValue(SPLITS_FILE, splits_file) |
| end |
| |
| if arg.key?(SPLITS) |
| splits = Java::byte[][arg[SPLITS].size].new |
| idx = 0 |
| arg.delete(SPLITS).each do |split| |
| splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split) |
| idx += 1 |
| end |
| elsif arg.key?(NUMREGIONS) || arg.key?(SPLITALGO) |
| # deprecated region pre-split API; if one of the above is specified, will be ignored. |
| raise(ArgumentError, 'Number of regions must be specified') unless arg.key?(NUMREGIONS) |
| raise(ArgumentError, 'Split algorithm must be specified') unless arg.key?(SPLITALGO) |
| raise(ArgumentError, 'Number of regions must be greater than 1') unless arg[NUMREGIONS] > 1 |
| num_regions = arg.delete(NUMREGIONS) |
| split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO)) |
| splits = split_algo.split(JInteger.valueOf(num_regions)) |
| end |
| |
| # Done with splits; apply formerly-table_att parameters. |
| update_tdb_from_arg(tdb, arg) |
| |
| arg.each_key do |ignored_key| |
| puts(format('An argument ignored (unknown or overridden): %s', ignored_key)) |
| end |
| end |
| |
| # Fail if no column families defined |
| raise(ArgumentError, 'Table must have at least one column family') unless has_columns |
| |
| if splits.nil? |
| # Perform the create table call |
| @admin.createTable(tdb.build) |
| else |
| # Perform the create table call |
| @admin.createTable(tdb.build, splits) |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| #---------------------------------------------------------------------------------------------- |
| # Assign a region |
| def assign(region_name) |
| @admin.assign(region_name.to_java_bytes) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Unassign a region |
| # the force parameter is deprecated, if it is specified, will be ignored. |
| def unassign(region_name, force = nil) |
| @admin.unassign(region_name.to_java_bytes) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Move a region |
| def move(encoded_region_name, server = nil) |
| @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Merge multiple regions |
| def merge_region(regions, force) |
| unless regions.is_a?(Array) |
| raise(ArgumentError, "Type of #{regions.inspect} is #{regions.class}, but expected Array") |
| end |
| region_array = Java::byte[][regions.length].new |
| i = 0 |
| while i < regions.length |
| unless regions[i].is_a?(String) |
| raise( |
| ArgumentError, |
| "Type of #{regions[i].inspect} is #{regions[i].class}, but expected String" |
| ) |
| end |
| region_array[i] = regions[i].to_java_bytes |
| i += 1 |
| end |
| org.apache.hadoop.hbase.util.FutureUtils.get( |
| @admin.mergeRegionsAsync( |
| region_array, |
| java.lang.Boolean.valueOf(force) |
| ) |
| ) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns table's structure description |
| def describe(table_name) |
| tableExists(table_name) |
| @admin.getDescriptor(TableName.valueOf(table_name)).to_s |
| end |
| |
| def get_column_families(table_name) |
| tableExists(table_name) |
| @admin.getDescriptor(TableName.valueOf(table_name)).getColumnFamilies |
| end |
| |
| def get_table_attributes(table_name) |
| tableExists(table_name) |
| td = @admin.getDescriptor TableName.valueOf(table_name) |
| # toStringTableAttributes is a public method, but it is defined on the private class |
| # ModifiableTableDescriptor, so we need reflection to access it in JDK 11+. |
| # TODO Maybe move this to a utility class in the future? |
| method = td.java_class.declared_method :toStringTableAttributes |
| method.accessible = true |
| method.invoke td |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Enable/disable snapshot auto-cleanup based on TTL expiration |
| # Returns previous snapshot auto-cleanup switch setting. |
| def snapshot_cleanup_switch(enable_disable) |
| @admin.snapshotCleanupSwitch( |
| java.lang.Boolean.valueOf(enable_disable), java.lang.Boolean.valueOf(false) |
| ) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Query the current state of the snapshot auto-cleanup based on TTL |
| # Returns the snapshot auto-cleanup state (true if enabled) |
| def snapshot_cleanup_enabled? |
| @admin.isSnapshotCleanupEnabled |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Truncates table (deletes all records by recreating the table) |
| def truncate(table_name_str) |
| puts "Truncating '#{table_name_str}' table (it may take a while):" |
| table_name = TableName.valueOf(table_name_str) |
| |
| if enabled?(table_name_str) |
| puts 'Disabling table...' |
| disable(table_name_str) |
| end |
| |
| puts 'Truncating table...' |
| @admin.truncateTable(table_name, false) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Truncates table while maintaining region boundaries |
| # (deletes all records by recreating the table) |
| def truncate_preserve(table_name_str) |
| puts "Truncating '#{table_name_str}' table (it may take a while):" |
| table_name = TableName.valueOf(table_name_str) |
| |
| if enabled?(table_name_str) |
| puts 'Disabling table...' |
| disable(table_name_str) |
| end |
| |
| puts 'Truncating table...' |
| @admin.truncateTable(table_name, true) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Check the status of alter command (number of regions reopened) |
| def alter_status(table_name) |
| # Table name should be a string |
| raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String) |
| |
| # Table should exist |
| raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name) |
| |
| begin |
| cluster_metrics = @admin.getClusterMetrics |
| table_region_status = cluster_metrics |
| .getTableRegionStatesCount |
| .get(org.apache.hadoop.hbase.TableName.valueOf(table_name)) |
| if table_region_status.getTotalRegions != 0 |
| updated_regions = table_region_status.getTotalRegions - |
| table_region_status.getRegionsInTransition - |
| table_region_status.getClosedRegions |
| puts "#{updated_regions}/#{table_region_status.getTotalRegions} regions updated." |
| else |
| puts 'All regions updated.' |
| end |
| sleep 1 |
| end while !table_region_status.nil? && table_region_status.getRegionsInTransition != 0 |
| puts 'Done.' |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Use our internal logic to convert from "spec string" format to a coprocessor descriptor |
| # |
| # Provided for backwards shell compatibility |
| # |
| # @param [String] spec_str |
| # @return [ColumnDescriptor] |
| def coprocessor_descriptor_from_spec_str(spec_str) |
| method = TableDescriptorBuilder.java_class.declared_method_smart :toCoprocessorDescriptor |
| method.accessible = true |
| result = method.invoke(nil, spec_str).to_java |
| # unpack java's Optional to be more rubonic |
| return result.isPresent ? result.get : nil |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Use CoprocessorDescriptorBuilder to convert a Hash to CoprocessorDescriptor |
| # |
| # @param [Hash] spec column descriptor specification |
| # @return [ColumnDescriptor] |
| def coprocessor_descriptor_from_hash(spec) |
| classname = spec[CLASSNAME] |
| raise ArgumentError.new "CLASSNAME must be provided in spec" if classname.nil? |
| jar_path = spec[JAR_PATH] |
| priority = spec[PRIORITY] |
| properties = spec[PROPERTIES] |
| |
| builder = CoprocessorDescriptorBuilder.newBuilder classname |
| builder.setJarPath jar_path unless jar_path.nil? |
| builder.setPriority priority unless priority.nil? |
| properties&.each { |k, v| builder.setProperty(k, v.to_s) } |
| |
| builder.build |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Change table structure or table options |
| def alter(table_name_str, wait = true, *args) |
| # Table name should be a string |
| raise(ArgumentError, 'Table name must be of type String') unless |
| table_name_str.is_a?(String) |
| |
| # Table should exist |
| raise(ArgumentError, "Can't find a table: #{table_name_str}") unless exists?(table_name_str) |
| |
| # There should be at least one argument |
| raise(ArgumentError, 'There should be at least one argument but the table name') if args.empty? |
| |
| table_name = TableName.valueOf(table_name_str) |
| |
| # Get table descriptor |
| tdb = TableDescriptorBuilder.newBuilder(@admin.getDescriptor(table_name)) |
| hasTableUpdate = false |
| |
| # Process all args |
| args.each do |arg| |
| # Normalize args to support column name only alter specs |
| arg = { NAME => arg } if arg.is_a?(String) |
| |
| # Normalize args to support shortcut delete syntax |
| arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete'] |
| |
| # There are 3 possible options. |
| # 1) Column family spec. Distinguished by having a NAME and no METHOD. |
| method = arg.delete(METHOD) |
| if method.nil? && arg.key?(NAME) |
| descriptor = cfd(arg, tdb) |
| column_name = descriptor.getNameAsString |
| |
| # If column already exist, then try to alter it. Create otherwise. |
| if tdb.build.hasColumnFamily(column_name.to_java_bytes) |
| tdb.modifyColumnFamily(descriptor) |
| else |
| tdb.setColumnFamily(descriptor) |
| end |
| hasTableUpdate = true |
| next |
| end |
| |
| # 2) Method other than table_att, with some args. |
| name = arg.delete(NAME) |
| if !method.nil? && method != 'table_att' |
| # Delete column family |
| if method == 'delete' |
| raise(ArgumentError, 'NAME parameter missing for delete method') unless name |
| tdb.removeColumnFamily(name.to_java_bytes) |
| hasTableUpdate = true |
| # Unset table attributes |
| elsif method == 'table_att_unset' |
| raise(ArgumentError, 'NAME parameter missing for table_att_unset method') unless name |
| if name.is_a?(Array) |
| name.each do |key| |
| if tdb.build.getValue(key).nil? |
| raise ArgumentError, "Could not find attribute: #{key}" |
| end |
| tdb.removeValue(key) |
| end |
| else |
| if tdb.build.getValue(name).nil? |
| raise ArgumentError, "Could not find attribute: #{name}" |
| end |
| tdb.removeValue(name) |
| end |
| hasTableUpdate = true |
| # Unset table configuration |
| elsif method == 'table_conf_unset' |
| raise(ArgumentError, 'NAME parameter missing for table_conf_unset method') unless name |
| if name.is_a?(Array) |
| name.each do |key| |
| if tdb.build.getValue(key).nil? |
| raise ArgumentError, "Could not find configuration: #{key}" |
| end |
| tdb.removeValue(key) |
| end |
| else |
| if tdb.build.getValue(name).nil? |
| raise ArgumentError, "Could not find configuration: #{name}" |
| end |
| tdb.removeValue(name) |
| end |
| hasTableUpdate = true |
| # Unknown method |
| else |
| raise ArgumentError, "Unknown method: #{method}" |
| end |
| |
| arg.each_key do |unknown_key| |
| puts(format('Unknown argument ignored: %s', unknown_key)) |
| end |
| |
| next |
| end |
| |
| # 3) Some args for the table, optionally with METHOD => table_att (deprecated) |
| update_tdb_from_arg(tdb, arg) |
| |
| # set a coprocessor attribute |
| valid_coproc_keys = [] |
| next unless arg.is_a?(Hash) |
| arg.each do |key, value| |
| k = String.new(key) # prepare to strip |
| k.strip! |
| |
| # Uses insensitive matching so we can accept lowercase 'coprocessor' for compatibility |
| next unless k =~ /#{COPROCESSOR}/i |
| if value.is_a? String |
| # Specifying a coprocessor by this "spec string" is here for backwards compatibility |
| v = String.new value |
| v.strip! |
| cp = coprocessor_descriptor_from_spec_str v |
| elsif value.is_a? Hash |
| cp = coprocessor_descriptor_from_hash value |
| else |
| raise ArgumentError.new 'coprocessor must be provided as a String or Hash' |
| end |
| tdb.setCoprocessor cp |
| valid_coproc_keys << key |
| end |
| |
| valid_coproc_keys.each do |key| |
| arg.delete(key) |
| end |
| |
| hasTableUpdate = true |
| |
| arg.each_key do |unknown_key| |
| puts(format('Unknown argument ignored: %s', unknown_key)) |
| end |
| |
| next |
| end |
| |
| # Bulk apply all table modifications. |
| if hasTableUpdate |
| future = @admin.modifyTableAsync(tdb.build) |
| |
| if wait == true |
| puts 'Updating all regions with the new schema...' |
| future.get |
| end |
| end |
| end |
| |
| def status(format, type) |
| cluster_metrics = @admin.getClusterMetrics |
| if format == 'detailed' |
| puts(format('version %s', cluster_metrics.getHBaseVersion)) |
| # Put regions in transition first because usually empty |
| puts(format('%d regionsInTransition', cluster_metrics.getRegionStatesInTransition.size)) |
| for v in cluster_metrics.getRegionStatesInTransition |
| puts(format(' %s', v)) |
| end |
| master = cluster_metrics.getMasterName |
| puts(format('active master: %s:%d %d', master.getHostname, master.getPort, |
| master.getStartcode)) |
| puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size)) |
| for server in cluster_metrics.getBackupMasterNames |
| puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode)) |
| end |
| |
| master_coprocs = @admin.getMasterCoprocessorNames.toString |
| unless master_coprocs.nil? |
| puts(format('master coprocessors: %s', master_coprocs)) |
| end |
| puts(format('%d live servers', cluster_metrics.getLiveServerMetrics.size)) |
| for server in cluster_metrics.getLiveServerMetrics.keySet |
| puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode)) |
| puts(format(' %s', cluster_metrics.getLiveServerMetrics.get(server).toString)) |
| for name, region in cluster_metrics.getLiveServerMetrics.get(server).getRegionMetrics |
| puts(format(' %s', region.getNameAsString.dump)) |
| puts(format(' %s', region.toString)) |
| end |
| end |
| puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size)) |
| for server in cluster_metrics.getDeadServerNames |
| puts(format(' %s', server)) |
| end |
| elsif format == 'replication' |
| puts(format('version %<version>s', version: cluster_metrics.getHBaseVersion)) |
| puts(format('%<servers>d live servers', |
| servers: cluster_metrics.getLiveServerMetrics.size)) |
| cluster_metrics.getLiveServerMetrics.keySet.each do |server_name| |
| sl = cluster_metrics.getLiveServerMetrics.get(server_name) |
| r_sink_string = ' SINK:' |
| r_source_string = ' SOURCE:' |
| r_load_sink = sl.getReplicationLoadSink |
| next if r_load_sink.nil? |
| if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted() |
| # If we have applied no operations since we've started replication, |
| # assume that we're not acting as a sink and don't print the normal information |
| r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s |
| r_sink_string << ", Waiting for OPs... " |
| else |
| r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s |
| r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s |
| r_sink_string << ", TimeStampsOfLastAppliedOp=" + |
| (java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString() |
| end |
| |
| r_load_source_map = sl.getReplicationLoadSourceMap |
| build_source_string(r_load_source_map, r_source_string) |
| puts(format(' %<host>s:', host: server_name.getHostname)) |
| if type.casecmp('SOURCE').zero? |
| puts(format('%<source>s', source: r_source_string)) |
| elsif type.casecmp('SINK').zero? |
| puts(format('%<sink>s', sink: r_sink_string)) |
| else |
| puts(format('%<source>s', source: r_source_string)) |
| puts(format('%<sink>s', sink: r_sink_string)) |
| end |
| end |
| elsif format == 'simple' |
| load = 0 |
| regions = 0 |
| master = cluster_metrics.getMasterName |
| puts(format('active master: %s:%d %d', master.getHostname, master.getPort, |
| master.getStartcode)) |
| puts(format('%d backup masters', cluster_metrics.getBackupMasterNames.size)) |
| for server in cluster_metrics.getBackupMasterNames |
| puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode)) |
| end |
| puts(format('%d live servers', cluster_metrics.getLiveServerMetrics.size)) |
| for server in cluster_metrics.getLiveServerMetrics.keySet |
| puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode)) |
| puts(format(' %s', cluster_metrics.getLiveServerMetrics.get(server).toString)) |
| load += cluster_metrics.getLiveServerMetrics.get(server).getRequestCountPerSecond |
| regions += cluster_metrics.getLiveServerMetrics.get(server).getRegionMetrics.size |
| end |
| puts(format('%d dead servers', cluster_metrics.getDeadServerNames.size)) |
| for server in cluster_metrics.getDeadServerNames |
| puts(format(' %s', server)) |
| end |
| puts(format('Aggregate load: %d, regions: %d', load, regions)) |
| else |
| puts "1 active master, #{cluster_metrics.getBackupMasterNames.size} backup masters, |
| #{cluster_metrics.getLiveServerMetrics.size} servers, |
| #{cluster_metrics.getDeadServerNames.size} dead, |
| #{format('%.4f', cluster_metrics.getAverageLoad)} average load" |
| end |
| end |
| |
| def build_source_string(r_load_source_map, r_source_string) |
| r_load_source_map.each do |peer, sources| |
| r_source_string << ' PeerID=' + peer |
| sources.each do |source_load| |
| build_queue_title(source_load, r_source_string) |
| build_running_source_stats(source_load, r_source_string) |
| end |
| end |
| end |
| |
| def build_queue_title(source_load, r_source_string) |
| r_source_string << if source_load.isRecovered |
| "\n Recovered Queue: " |
| else |
| "\n Normal Queue: " |
| end |
| r_source_string << source_load.getQueueId |
| end |
| |
| def build_running_source_stats(source_load, r_source_string) |
| if source_load.isRunning |
| build_shipped_stats(source_load, r_source_string) |
| build_load_general_stats(source_load, r_source_string) |
| r_source_string << ', Replication Lag=' + |
| source_load.getReplicationLag.to_s |
| else |
| r_source_string << "\n " |
| r_source_string << 'No Reader/Shipper threads runnning yet.' |
| end |
| end |
| |
| def build_shipped_stats(source_load, r_source_string) |
| r_source_string << if source_load.getTimestampOfLastShippedOp.zero? |
| "\n " \ |
| 'No Ops shipped since last restart' |
| else |
| "\n AgeOfLastShippedOp=" + |
| source_load.getAgeOfLastShippedOp.to_s + |
| ', TimeStampOfLastShippedOp=' + |
| java.util.Date.new(source_load |
| .getTimestampOfLastShippedOp).toString |
| end |
| end |
| |
| def build_load_general_stats(source_load, r_source_string) |
| r_source_string << ', SizeOfLogQueue=' + |
| source_load.getSizeOfLogQueue.to_s |
| r_source_string << ', EditsReadFromLogQueue=' + |
| source_load.getEditsRead.to_s |
| r_source_string << ', OpsShippedToTarget=' + |
| source_load.getOPsShipped.to_s |
| build_edits_for_source(source_load, r_source_string) |
| end |
| |
| def build_edits_for_source(source_load, r_source_string) |
| if source_load.hasEditsSinceRestart |
| r_source_string << ', TimeStampOfNextToReplicate=' + |
| java.util.Date.new(source_load |
| .getTimeStampOfNextToReplicate).toString |
| else |
| r_source_string << ', No edits for this source' |
| r_source_string << ' since it started' |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # |
| # Helper methods |
| # |
| |
| # Does table exist? |
| def exists?(table_name) |
| @admin.tableExists(TableName.valueOf(table_name)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Is table enabled |
| def enabled?(table_name) |
| @admin.isTableEnabled(TableName.valueOf(table_name)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Return a new ColumnFamilyDescriptor made of passed args |
| def cfd(arg, tdb) |
| # String arg, single parameter constructor |
| return ColumnFamilyDescriptorBuilder.of(arg) if arg.is_a?(String) |
| |
| raise(ArgumentError, "Column family #{arg} must have a name") unless name = arg.delete(NAME) |
| |
| descriptor = tdb.build.getColumnFamily(name.to_java_bytes) |
| unless descriptor.nil? |
| cfdb = ColumnFamilyDescriptorBuilder.newBuilder(descriptor) |
| end |
| # create it if it's a new family |
| cfdb ||= ColumnFamilyDescriptorBuilder.newBuilder(name.to_java_bytes) |
| |
| cfdb.setBlockCacheEnabled(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::BLOCKCACHE))) if arg.include?(ColumnFamilyDescriptorBuilder::BLOCKCACHE) |
| cfdb.setScope(JInteger.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::REPLICATION_SCOPE))) if arg.include?(ColumnFamilyDescriptorBuilder::REPLICATION_SCOPE) |
| cfdb.setCacheDataOnWrite(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::CACHE_DATA_ON_WRITE))) if arg.include?(ColumnFamilyDescriptorBuilder::CACHE_DATA_ON_WRITE) |
| cfdb.setCacheIndexesOnWrite(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::CACHE_INDEX_ON_WRITE))) if arg.include?(ColumnFamilyDescriptorBuilder::CACHE_INDEX_ON_WRITE) |
| cfdb.setCacheBloomsOnWrite(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::CACHE_BLOOMS_ON_WRITE))) if arg.include?(ColumnFamilyDescriptorBuilder::CACHE_BLOOMS_ON_WRITE) |
| cfdb.setEvictBlocksOnClose(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::EVICT_BLOCKS_ON_CLOSE))) if arg.include?(ColumnFamilyDescriptorBuilder::EVICT_BLOCKS_ON_CLOSE) |
| cfdb.setInMemory(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::IN_MEMORY))) if arg.include?(ColumnFamilyDescriptorBuilder::IN_MEMORY) |
| if arg.include?(ColumnFamilyDescriptorBuilder::IN_MEMORY_COMPACTION) |
| cfdb.setInMemoryCompaction( |
| org.apache.hadoop.hbase.MemoryCompactionPolicy.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::IN_MEMORY_COMPACTION)) |
| ) |
| end |
| cfdb.setTimeToLive(arg.delete(ColumnFamilyDescriptorBuilder::TTL)) if arg.include?(ColumnFamilyDescriptorBuilder::TTL) |
| cfdb.setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::DATA_BLOCK_ENCODING))) if arg.include?(ColumnFamilyDescriptorBuilder::DATA_BLOCK_ENCODING) |
| cfdb.setBlocksize(arg.delete(ColumnFamilyDescriptorBuilder::BLOCKSIZE)) if arg.include?(ColumnFamilyDescriptorBuilder::BLOCKSIZE) |
| cfdb.setMaxVersions(JInteger.valueOf(arg.delete(HConstants::VERSIONS))) if arg.include?(HConstants::VERSIONS) |
| cfdb.setMinVersions(JInteger.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::MIN_VERSIONS))) if arg.include?(ColumnFamilyDescriptorBuilder::MIN_VERSIONS) |
| cfdb.setKeepDeletedCells(org.apache.hadoop.hbase.KeepDeletedCells.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::KEEP_DELETED_CELLS).to_s.upcase)) if arg.include?(ColumnFamilyDescriptorBuilder::KEEP_DELETED_CELLS) |
| cfdb.setCompressTags(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::COMPRESS_TAGS))) if arg.include?(ColumnFamilyDescriptorBuilder::COMPRESS_TAGS) |
| cfdb.setPrefetchBlocksOnOpen(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::PREFETCH_BLOCKS_ON_OPEN))) if arg.include?(ColumnFamilyDescriptorBuilder::PREFETCH_BLOCKS_ON_OPEN) |
| cfdb.setMobEnabled(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::IS_MOB))) if arg.include?(ColumnFamilyDescriptorBuilder::IS_MOB) |
| cfdb.setMobThreshold(JLong.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::MOB_THRESHOLD))) if arg.include?(ColumnFamilyDescriptorBuilder::MOB_THRESHOLD) |
| cfdb.setNewVersionBehavior(JBoolean.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::NEW_VERSION_BEHAVIOR))) if arg.include?(ColumnFamilyDescriptorBuilder::NEW_VERSION_BEHAVIOR) |
| if arg.include?(ColumnFamilyDescriptorBuilder::BLOOMFILTER) |
| bloomtype = arg.delete(ColumnFamilyDescriptorBuilder::BLOOMFILTER).upcase.to_sym |
| if org.apache.hadoop.hbase.regionserver.BloomType.constants.include?(bloomtype) |
| cfdb.setBloomFilterType(org.apache.hadoop.hbase.regionserver.BloomType.valueOf(bloomtype)) |
| else |
| raise(ArgumentError, "BloomFilter type #{bloomtype} is not supported. Use one of " + org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.join(' ')) |
| end |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::COMPRESSION) |
| compression = arg.delete(ColumnFamilyDescriptorBuilder::COMPRESSION).upcase.to_sym |
| if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression) |
| cfdb.setCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression)) |
| else |
| raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' ')) |
| end |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::ENCRYPTION) |
| algorithm = arg.delete(ColumnFamilyDescriptorBuilder::ENCRYPTION).upcase |
| cfdb.setEncryptionType(algorithm) |
| if arg.include?(ColumnFamilyDescriptorBuilder::ENCRYPTION_KEY) |
| key = org.apache.hadoop.hbase.io.crypto.Encryption.generateSecretKey( |
| @conf, algorithm, arg.delete(ColumnFamilyDescriptorBuilder::ENCRYPTION_KEY) |
| ) |
| cfdb.setEncryptionKey(org.apache.hadoop.hbase.security.EncryptionUtil.wrapKey(@conf, key, |
| algorithm)) |
| end |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::COMPRESSION_COMPACT) |
| compression = arg.delete(ColumnFamilyDescriptorBuilder::COMPRESSION_COMPACT).upcase.to_sym |
| if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression) |
| cfdb.setCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression)) |
| else |
| raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' ')) |
| end |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::COMPRESSION_COMPACT_MAJOR) |
| compression = arg.delete(ColumnFamilyDescriptorBuilder::COMPRESSION_COMPACT_MAJOR).upcase.to_sym |
| if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression) |
| cfdb.setMajorCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression)) |
| else |
| raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' ')) |
| end |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::COMPRESSION_COMPACT_MINOR) |
| compression = arg.delete(ColumnFamilyDescriptorBuilder::COMPRESSION_COMPACT_MINOR).upcase.to_sym |
| if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression) |
| cfdb.setMinorCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression)) |
| else |
| raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' ')) |
| end |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::STORAGE_POLICY) |
| storage_policy = arg.delete(ColumnFamilyDescriptorBuilder::STORAGE_POLICY).upcase |
| cfdb.setStoragePolicy(storage_policy) |
| end |
| if arg.include?(ColumnFamilyDescriptorBuilder::MOB_COMPACT_PARTITION_POLICY) |
| mob_partition_policy = arg.delete(ColumnFamilyDescriptorBuilder::MOB_COMPACT_PARTITION_POLICY).upcase.to_sym |
| if MobCompactPartitionPolicy.constants.include?(mob_partition_policy) |
| cfdb.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.valueOf(mob_partition_policy)) |
| else |
| raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + MobCompactPartitionPolicy.constants.join(' ')) |
| end |
| end |
| |
| set_user_metadata(cfdb, arg.delete(METADATA)) if arg[METADATA] |
| set_descriptor_config(cfdb, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] |
| if arg.include?(ColumnFamilyDescriptorBuilder::DFS_REPLICATION) |
| cfdb.setDFSReplication(JInteger.valueOf(arg.delete(ColumnFamilyDescriptorBuilder::DFS_REPLICATION))) |
| end |
| |
| arg.each_key do |unknown_key| |
| puts(format('Unknown argument ignored for column family %s: %s', name, unknown_key)) |
| end |
| |
| cfdb.build |
| end |
| |
| # Apply user metadata to table/column descriptor |
| def set_user_metadata(descriptor, metadata) |
| raise(ArgumentError, "#{METADATA} must be a Hash type") unless metadata.is_a?(Hash) |
| for k, v in metadata |
| v = v.to_s unless v.nil? |
| descriptor.setValue(k, v) |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Take a snapshot of specified table |
| def snapshot(table, snapshot_name, *args) |
| # Table name should be a string |
| raise(ArgumentError, 'Table name must be of type String') unless table.is_a?(String) |
| |
| # Snapshot name should be a string |
| raise(ArgumentError, 'Snapshot name must be of type String') unless |
| snapshot_name.is_a?(String) |
| |
| table_name = TableName.valueOf(table) |
| if args.empty? |
| @admin.snapshot(snapshot_name, table_name) |
| else |
| args.each do |arg| |
| ttl = arg[TTL] |
| ttl = ttl ? ttl.to_java(:long) : -1 |
| snapshot_props = java.util.HashMap.new |
| snapshot_props.put("TTL", ttl) |
| max_filesize = arg[MAX_FILESIZE] |
| max_filesize = max_filesize ? max_filesize.to_java(:long) : -1 |
| snapshot_props.put("MAX_FILESIZE", max_filesize) |
| if arg[SKIP_FLUSH] == true |
| @admin.snapshot(snapshot_name, table_name, |
| org.apache.hadoop.hbase.client.SnapshotType::SKIPFLUSH, snapshot_props) |
| else |
| @admin.snapshot(snapshot_name, table_name, snapshot_props) |
| end |
| end |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Restore specified snapshot |
| def restore_snapshot(snapshot_name, restore_acl = false) |
| conf = @connection.getConfiguration |
| take_fail_safe_snapshot = conf.getBoolean('hbase.snapshot.restore.take.failsafe.snapshot', false) |
| @admin.restoreSnapshot(snapshot_name, take_fail_safe_snapshot, restore_acl) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Create a new table by cloning the snapshot content |
| def clone_snapshot(snapshot_name, table, restore_acl = false) |
| @admin.cloneSnapshot(snapshot_name, TableName.valueOf(table), restore_acl) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Delete specified snapshot |
| def delete_snapshot(snapshot_name) |
| @admin.deleteSnapshot(snapshot_name) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Deletes the snapshots matching the given regex |
| def delete_all_snapshot(regex) |
| @admin.deleteSnapshots(Pattern.compile(regex)).to_a |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Deletes the table snapshots matching the given regex |
| def delete_table_snapshots(tableNameRegex, snapshotNameRegex = '.*') |
| @admin.deleteTableSnapshots(Pattern.compile(tableNameRegex), |
| Pattern.compile(snapshotNameRegex)).to_a |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of snapshots |
| def list_snapshot(regex = '.*') |
| @admin.listSnapshots(Pattern.compile(regex)).to_a |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of table snapshots |
| def list_table_snapshots(tableNameRegex, snapshotNameRegex = '.*') |
| @admin.listTableSnapshots(Pattern.compile(tableNameRegex), |
| Pattern.compile(snapshotNameRegex)).to_a |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns the whole ClusterMetrics containing details: |
| # |
| # hbase version |
| # cluster id |
| # primary/backup master(s) |
| # master's coprocessors |
| # live/dead regionservers |
| # balancer |
| # regions in transition |
| def getClusterMetrics |
| @admin.getClusterMetrics |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of regionservers |
| def getRegionServers |
| @admin.getClusterMetrics.getLiveServerMetrics.keySet.map { |server_name| server_name } |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns servername corresponding to passed server_name_string |
| def getServerName(server_name_string) |
| regionservers = getRegionServers |
| |
| if ServerName.isFullServerName(server_name_string) |
| return ServerName.valueOf(server_name_string) |
| else |
| name_list = server_name_string.split(',') |
| |
| regionservers.each do|sn| |
| if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s)) |
| return sn |
| end |
| end |
| end |
| |
| return nil |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of servernames |
| def getServerNames(servers, should_return_all_if_servers_empty) |
| regionservers = getRegionServers |
| servernames = [] |
| |
| if servers.empty? |
| # if no servers were specified as arguments, get a list of all servers |
| if should_return_all_if_servers_empty |
| servernames = regionservers |
| end |
| else |
| # Strings replace with ServerName objects in servers array |
| i = 0 |
| while i < servers.length |
| server = servers[i] |
| |
| if ServerName.isFullServerName(server) |
| servernames.push(ServerName.valueOf(server)) |
| else |
| name_list = server.split(',') |
| j = 0 |
| while j < regionservers.length |
| sn = regionservers[j] |
| if name_list[0] == sn.hostname && (name_list[1].nil? ? true : (name_list[1] == sn.port.to_s)) |
| servernames.push(sn) |
| end |
| j += 1 |
| end |
| end |
| i += 1 |
| end |
| end |
| |
| servernames |
| end |
| |
| # Apply config specific to a table/column to its descriptor |
| def set_descriptor_config(descriptor, config) |
| raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.is_a?(Hash) |
| for k, v in config |
| v = v.to_s unless v.nil? |
| descriptor.setValue(k, v) |
| end |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Updates the configuration of one regionserver. |
| def update_config(serverName) |
| @admin.updateConfiguration(ServerName.valueOf(serverName)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Updates the configuration of all the regionservers. |
| def update_all_config |
| @admin.updateConfiguration |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns namespace's structure description |
| def describe_namespace(namespace_name) |
| namespace = @admin.getNamespaceDescriptor(namespace_name) |
| |
| return namespace.to_s unless namespace.nil? |
| |
| raise(ArgumentError, "Failed to find namespace named #{namespace_name}") |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of namespaces in hbase |
| def list_namespace(regex = '.*') |
| pattern = java.util.regex.Pattern.compile(regex) |
| list = @admin.listNamespaces |
| list.select { |s| pattern.match(s) } |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Returns a list of tables in namespace |
| def list_namespace_tables(namespace_name) |
| unless namespace_name.nil? |
| return @admin.listTableNamesByNamespace(namespace_name).map(&:getQualifierAsString) |
| end |
| |
| raise(ArgumentError, "Failed to find namespace named #{namespace_name}") |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Creates a namespace |
| def create_namespace(namespace_name, *args) |
| # Fail if table name is not a string |
| raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String) |
| |
| # Flatten params array |
| args = args.flatten.compact |
| |
| # Start defining the table |
| nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(namespace_name) |
| args.each do |arg| |
| unless arg.is_a?(Hash) |
| raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type") |
| end |
| for k, v in arg |
| v = v.to_s unless v.nil? |
| nsb.addConfiguration(k, v) |
| end |
| end |
| @admin.createNamespace(nsb.build) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # modify a namespace |
| def alter_namespace(namespace_name, *args) |
| # Fail if namespace name is not a string |
| raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String) |
| |
| nsd = @admin.getNamespaceDescriptor(namespace_name) |
| |
| raise(ArgumentError, 'Namespace does not exist') unless nsd |
| nsb = org.apache.hadoop.hbase.NamespaceDescriptor.create(nsd) |
| |
| # Flatten params array |
| args = args.flatten.compact |
| |
| # Start defining the table |
| args.each do |arg| |
| unless arg.is_a?(Hash) |
| raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash type") |
| end |
| method = arg[METHOD] |
| if method == 'unset' |
| nsb.removeConfiguration(arg[NAME]) |
| elsif method == 'set' |
| arg.delete(METHOD) |
| for k, v in arg |
| v = v.to_s unless v.nil? |
| |
| nsb.addConfiguration(k, v) |
| end |
| else |
| raise(ArgumentError, "Unknown method #{method}") |
| end |
| end |
| @admin.modifyNamespace(nsb.build) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Get namespace's rsgroup |
| def get_namespace_rsgroup(namespace_name) |
| # Fail if namespace name is not a string |
| raise(ArgumentError, 'Namespace name must be of type String') unless namespace_name.is_a?(String) |
| nsd = @admin.getNamespaceDescriptor(namespace_name) |
| raise(ArgumentError, 'Namespace does not exist') unless nsd |
| nsd.getConfigurationValue("hbase.rsgroup.name") |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Drops a table |
| def drop_namespace(namespace_name) |
| @admin.deleteNamespace(namespace_name) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Get security capabilities |
| def get_security_capabilities |
| @admin.getSecurityCapabilities |
| end |
| |
| # List all procedures |
| def list_procedures |
| @admin.getProcedures |
| end |
| |
| # List all locks |
| def list_locks |
| @admin.getLocks |
| end |
| |
| # Parse arguments and update TableDescriptorBuilder accordingly |
| # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity |
| def update_tdb_from_arg(tdb, arg) |
| tdb.setMaxFileSize(arg.delete(TableDescriptorBuilder::MAX_FILESIZE)) if arg.include?(TableDescriptorBuilder::MAX_FILESIZE) |
| tdb.setReadOnly(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::READONLY))) if arg.include?(TableDescriptorBuilder::READONLY) |
| tdb.setCompactionEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::COMPACTION_ENABLED))) if arg.include?(TableDescriptorBuilder::COMPACTION_ENABLED) |
| tdb.setSplitEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::SPLIT_ENABLED))) if arg.include?(TableDescriptorBuilder::SPLIT_ENABLED) |
| tdb.setMergeEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::MERGE_ENABLED))) if arg.include?(TableDescriptorBuilder::MERGE_ENABLED) |
| tdb.setNormalizationEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::NORMALIZATION_ENABLED))) if arg.include?(TableDescriptorBuilder::NORMALIZATION_ENABLED) |
| tdb.setNormalizerTargetRegionCount(JInteger.valueOf(arg.delete(TableDescriptorBuilder::NORMALIZER_TARGET_REGION_COUNT))) if arg.include?(TableDescriptorBuilder::NORMALIZER_TARGET_REGION_COUNT) |
| tdb.setNormalizerTargetRegionSize(JLong.valueOf(arg.delete(TableDescriptorBuilder::NORMALIZER_TARGET_REGION_SIZE))) if arg.include?(TableDescriptorBuilder::NORMALIZER_TARGET_REGION_SIZE) |
| tdb.setMemStoreFlushSize(arg.delete(TableDescriptorBuilder::MEMSTORE_FLUSHSIZE)) if arg.include?(TableDescriptorBuilder::MEMSTORE_FLUSHSIZE) |
| tdb.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(TableDescriptorBuilder::DURABILITY))) if arg.include?(TableDescriptorBuilder::DURABILITY) |
| tdb.setPriority(JInteger.valueOf(arg.delete(TableDescriptorBuilder::PRIORITY))) if arg.include?(TableDescriptorBuilder::PRIORITY) |
| tdb.setFlushPolicyClassName(arg.delete(TableDescriptorBuilder::FLUSH_POLICY)) if arg.include?(TableDescriptorBuilder::FLUSH_POLICY) |
| tdb.setRegionMemStoreReplication(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::REGION_MEMSTORE_REPLICATION))) if arg.include?(TableDescriptorBuilder::REGION_MEMSTORE_REPLICATION) |
| tdb.setRegionSplitPolicyClassName(arg.delete(TableDescriptorBuilder::SPLIT_POLICY)) if arg.include?(TableDescriptorBuilder::SPLIT_POLICY) |
| tdb.setRegionReplication(JInteger.valueOf(arg.delete(TableDescriptorBuilder::REGION_REPLICATION))) if arg.include?(TableDescriptorBuilder::REGION_REPLICATION) |
| set_user_metadata(tdb, arg.delete(METADATA)) if arg[METADATA] |
| set_descriptor_config(tdb, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] |
| end |
| # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity |
| |
| #---------------------------------------------------------------------------------------------- |
| # clear compaction queues |
| def clear_compaction_queues(server_name, queue_name = nil) |
| names = %w[long short] |
| queues = java.util.HashSet.new |
| if queue_name.nil? |
| queues.add('long') |
| queues.add('short') |
| elsif queue_name.is_a?(String) |
| queues.add(queue_name) |
| unless names.include?(queue_name) |
| raise(ArgumentError, "Unknown queue name #{queue_name}") |
| end |
| elsif queue_name.is_a?(Array) |
| queue_name.each do |s| |
| queues.add(s) |
| unless names.include?(s) |
| raise(ArgumentError, "Unknown queue name #{s}") |
| end |
| end |
| else |
| raise(ArgumentError, "Unknown queue name #{queue_name}") |
| end |
| @admin.clearCompactionQueues(ServerName.valueOf(server_name), queues) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # clear dead region servers |
| def list_deadservers |
| @admin.listDeadServers.to_a |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # clear dead region servers |
| def clear_deadservers(dead_servers) |
| # Flatten params array |
| dead_servers = dead_servers.flatten.compact |
| if dead_servers.empty? |
| servers = list_deadservers |
| else |
| servers = java.util.ArrayList.new |
| dead_servers.each do |s| |
| servers.add(ServerName.valueOf(s)) |
| end |
| end |
| @admin.clearDeadServers(servers).to_a |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # List live region servers |
| def list_liveservers |
| @admin.getClusterMetrics.getLiveServerMetrics.keySet.to_a |
| end |
| |
| #--------------------------------------------------------------------------- |
| # create a new table by cloning the existent table schema. |
| def clone_table_schema(table_name, new_table_name, preserve_splits = true) |
| @admin.cloneTableSchema(TableName.valueOf(table_name), |
| TableName.valueOf(new_table_name), |
| preserve_splits) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # List decommissioned RegionServers |
| def list_decommissioned_regionservers |
| @admin.listDecommissionedRegionServers |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Retrieve SlowLog Responses from RegionServers |
| def get_slowlog_responses(server_names, args, is_large_log = false) |
| unless server_names.is_a?(Array) || server_names.is_a?(String) |
| raise(ArgumentError, |
| "#{server_names.class} of #{server_names.inspect} is not of Array/String type") |
| end |
| if server_names == '*' |
| server_names = getServerNames([], true) |
| else |
| server_names_list = to_server_names(server_names) |
| server_names = getServerNames(server_names_list, false) |
| end |
| filter_params = get_filter_params(args) |
| if args.key? 'LIMIT' |
| limit = args['LIMIT'] |
| else |
| limit = 10 |
| end |
| if is_large_log |
| log_type = 'LARGE_LOG' |
| else |
| log_type = 'SLOW_LOG' |
| end |
| log_dest = org.apache.hadoop.hbase.client.ServerType::REGION_SERVER |
| server_names_set = java.util.HashSet.new(server_names) |
| slow_log_responses = @admin.getLogEntries(server_names_set, log_type, log_dest, limit, |
| filter_params) |
| slow_log_responses_arr = [] |
| slow_log_responses.each { |slow_log_response| |
| slow_log_responses_arr << slow_log_response.toJsonPrettyPrint |
| } |
| slow_log_responses_arr |
| end |
| |
| def get_filter_params(args) |
| filter_params = java.util.HashMap.new |
| if args.key? 'REGION_NAME' |
| region_name = args['REGION_NAME'] |
| filter_params.put('regionName', region_name) |
| end |
| if args.key? 'TABLE_NAME' |
| table_name = args['TABLE_NAME'] |
| filter_params.put('tableName', table_name) |
| end |
| if args.key? 'CLIENT_IP' |
| client_address = args['CLIENT_IP'] |
| filter_params.put('clientAddress', client_address) |
| end |
| if args.key? 'USER' |
| user = args['USER'] |
| filter_params.put('userName', user) |
| end |
| if args.key? 'FILTER_BY_OP' |
| filter_by_op = args['FILTER_BY_OP'] |
| if filter_by_op != 'OR' && filter_by_op != 'AND' |
| raise(ArgumentError, "FILTER_BY_OP should be either OR / AND") |
| end |
| if filter_by_op == 'AND' |
| filter_params.put('filterByOperator', 'AND') |
| end |
| end |
| filter_params |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Clears SlowLog Responses from RegionServers |
| def clear_slowlog_responses(server_names) |
| unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String) |
| raise(ArgumentError, |
| "#{server_names.class} of #{server_names.inspect} is not of correct type") |
| end |
| if server_names.nil? |
| server_names = getServerNames([], true) |
| else |
| server_names_list = to_server_names(server_names) |
| server_names = getServerNames(server_names_list, false) |
| end |
| clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names)) |
| clear_log_success_count = 0 |
| clear_log_responses.each do |response| |
| if response |
| clear_log_success_count += 1 |
| end |
| end |
| puts 'Cleared Slowlog responses from ' \ |
| "#{clear_log_success_count}/#{clear_log_responses.size} RegionServers" |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Decommission a list of region servers, optionally offload corresponding regions |
| def decommission_regionservers(host_or_servers, should_offload) |
| # Fail if host_or_servers is neither a string nor an array |
| unless host_or_servers.is_a?(Array) || host_or_servers.is_a?(String) |
| raise(ArgumentError, |
| "#{host_or_servers.class} of #{host_or_servers.inspect} is not of Array/String type") |
| end |
| |
| # Fail if should_offload is neither a TrueClass/FalseClass nor a string |
| unless (!!should_offload == should_offload) || should_offload.is_a?(String) |
| raise(ArgumentError, "#{should_offload} is not a boolean value") |
| end |
| |
| # If a string is passed, convert it to an array |
| _host_or_servers = host_or_servers.is_a?(Array) ? |
| host_or_servers : |
| java.util.Arrays.asList(host_or_servers) |
| |
| # Retrieve the server names corresponding to passed _host_or_servers list |
| server_names = getServerNames(_host_or_servers, false) |
| |
| # Fail, if we can not find any server(s) corresponding to the passed host_or_servers |
| if server_names.empty? |
| raise(ArgumentError, |
| "Could not find any server(s) with specified name(s): #{host_or_servers}") |
| end |
| |
| @admin.decommissionRegionServers(server_names, |
| java.lang.Boolean.valueOf(should_offload)) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Recommission a region server, optionally load a list of passed regions |
| def recommission_regionserver(server_name_string, encoded_region_names) |
| # Fail if server_name_string is not a string |
| unless server_name_string.is_a?(String) |
| raise(ArgumentError, |
| "#{server_name_string.class} of #{server_name_string.inspect} is not of String type") |
| end |
| |
| # Fail if encoded_region_names is not an array |
| unless encoded_region_names.is_a?(Array) |
| raise(ArgumentError, |
| "#{encoded_region_names.class} of #{encoded_region_names.inspect} is not of Array type") |
| end |
| |
| # Convert encoded_region_names from string to bytes (element-wise) |
| region_names_in_bytes = encoded_region_names |
| .map {|region_name| region_name.to_java_bytes} |
| .compact |
| |
| # Retrieve the server name corresponding to the passed server_name_string |
| server_name = getServerName(server_name_string) |
| |
| # Fail if we can not find a server corresponding to the passed server_name_string |
| if server_name.nil? |
| raise(ArgumentError, |
| "Could not find any server with name #{server_name_string}") |
| end |
| |
| @admin.recommissionRegionServer(server_name, region_names_in_bytes) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Retrieve latest balancer decisions made by LoadBalancers |
| def get_balancer_decisions(args) |
| if args.key? 'LIMIT' |
| limit = args['LIMIT'] |
| else |
| limit = 250 |
| end |
| log_type = 'BALANCER_DECISION' |
| log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER |
| balancer_decisions_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil) |
| balancer_decisions_resp_arr = [] |
| balancer_decisions_responses.each { |balancer_dec_resp| |
| balancer_decisions_resp_arr << balancer_dec_resp.toJsonPrettyPrint |
| } |
| balancer_decisions_resp_arr |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Retrieve latest balancer rejections made by LoadBalancers |
| def get_balancer_rejections(args) |
| if args.key? 'LIMIT' |
| limit = args['LIMIT'] |
| else |
| limit = 250 |
| end |
| |
| log_type = 'BALANCER_REJECTION' |
| log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER |
| balancer_rejections_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil) |
| balancer_rejections_resp_arr = [] |
| balancer_rejections_responses.each { |balancer_dec_resp| |
| balancer_rejections_resp_arr << balancer_dec_resp.toJsonPrettyPrint |
| } |
| balancer_rejections_resp_arr |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Stop the active Master |
| def stop_master |
| @admin.stopMaster |
| end |
| |
| # Stop the given RegionServer |
| def stop_regionserver(hostport) |
| @admin.stopRegionServer(hostport) |
| end |
| |
| #---------------------------------------------------------------------------------------------- |
| # Get list of server names |
| def to_server_names(server_names) |
| if server_names.is_a?(Array) |
| server_names |
| else |
| java.util.Arrays.asList(server_names) |
| end |
| end |
| end |
| # rubocop:enable Metrics/ClassLength |
| end |