blob: c8e0076eb9f8de487e336b831a6d82eeacd848ed [file] [log] [blame]
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
include Java
# Wrapper for org.apache.hadoop.hbase.client.HTable
module Hbase
class Table
include HBaseConstants
def initialize(configuration, table_name, formatter)
@table = org.apache.hadoop.hbase.client.HTable.new(configuration, table_name)
end
#----------------------------------------------------------------------------------------------
# Put a cell 'value' at specified table/row/column
def put(row, column, value, timestamp = nil)
p = org.apache.hadoop.hbase.client.Put.new(row.to_s.to_java_bytes)
family, qualifier = parse_column_name(column)
if timestamp
p.add(family, qualifier, timestamp, value.to_s.to_java_bytes)
else
p.add(family, qualifier, value.to_s.to_java_bytes)
end
@table.put(p)
end
#----------------------------------------------------------------------------------------------
# Delete a cell
def delete(row, column, timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP)
deleteall(row, column, timestamp)
end
#----------------------------------------------------------------------------------------------
# Delete a row
def deleteall(row, column = nil, timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP)
d = org.apache.hadoop.hbase.client.Delete.new(row.to_s.to_java_bytes, timestamp, nil)
if column
family, qualifier = parse_column_name(column)
d.deleteColumns(family, qualifier, timestamp)
end
@table.delete(d)
end
#----------------------------------------------------------------------------------------------
# Increment a counter atomically
def incr(row, column, value = nil)
value ||= 1
family, qualifier = parse_column_name(column)
@table.incrementColumnValue(row.to_s.to_java_bytes, family, qualifier, value)
end
#----------------------------------------------------------------------------------------------
# Count rows in a table
def count(interval = 1000, caching_rows = 10)
# We can safely set scanner caching with the first key only filter
scan = org.apache.hadoop.hbase.client.Scan.new
scan.cache_blocks = false
scan.caching = caching_rows
scan.setFilter(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new)
# Run the scanner
scanner = @table.getScanner(scan)
count = 0
iter = scanner.iterator
# Iterate results
while iter.hasNext
row = iter.next
count += 1
next unless (block_given? && count % interval == 0)
# Allow command modules to visualize counting process
yield(count, String.from_java_bytes(row.getRow))
end
# Return the counter
return count
end
#----------------------------------------------------------------------------------------------
# Get from table
def get(row, *args)
get = org.apache.hadoop.hbase.client.Get.new(row.to_s.to_java_bytes)
maxlength = -1
# Normalize args
args = args.first if args.first.kind_of?(Hash)
if args.kind_of?(String) || args.kind_of?(Array)
columns = [ args ].flatten.compact
args = { COLUMNS => columns }
end
#
# Parse arguments
#
unless args.kind_of?(Hash)
raise ArgumentError, "Failed parse of of #{args.inspect}, #{args.class}"
end
# Get maxlength parameter if passed
maxlength = args.delete(MAXLENGTH) if args[MAXLENGTH]
unless args.empty?
columns = args[COLUMN] || args[COLUMNS]
if columns
# Normalize types, convert string to an array of strings
columns = [ columns ] if columns.is_a?(String)
# At this point it is either an array or some unsupported stuff
unless columns.kind_of?(Array)
raise ArgumentError, "Failed parse column argument type #{args.inspect}, #{args.class}"
end
# Get each column name and add it to the filter
columns.each do |column|
family, qualifier = parse_column_name(column.to_s)
if qualifier
get.addColumn(family, qualifier)
else
get.addFamily(family)
end
end
# Additional params
get.setMaxVersions(args[VERSIONS] || 1)
get.setTimeStamp(args[TIMESTAMP]) if args[TIMESTAMP]
else
# May have passed TIMESTAMP and row only; wants all columns from ts.
unless ts = args[TIMESTAMP]
raise ArgumentError, "Failed parse of #{args.inspect}, #{args.class}"
end
# Set the timestamp
get.setTimeStamp(ts.to_i)
end
end
# Call hbase for the results
result = @table.get(get)
return nil if result.isEmpty
# Print out results. Result can be Cell or RowResult.
res = {}
result.list.each do |kv|
family = String.from_java_bytes(kv.getFamily)
qualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getQualifier)
column = "#{family}:#{qualifier}"
value = to_string(column, kv, maxlength)
if block_given?
yield(column, value)
else
res[column] = value
end
end
# If block given, we've yielded all the results, otherwise just return them
return ((block_given?) ? nil : res)
end
#----------------------------------------------------------------------------------------------
# Fetches and decodes a counter value from hbase
def get_counter(row, column)
family, qualifier = parse_column_name(column.to_s)
# Format get request
get = org.apache.hadoop.hbase.client.Get.new(row.to_s.to_java_bytes)
get.addColumn(family, qualifier)
get.setMaxVersions(1)
# Call hbase
result = @table.get(get)
return nil if result.isEmpty
# Fetch cell value
cell = result.list.first
org.apache.hadoop.hbase.util.Bytes::toLong(cell.getValue)
end
#----------------------------------------------------------------------------------------------
# Scans whole table or a range of keys and returns rows matching specific criterias
def scan(args = {})
unless args.kind_of?(Hash)
raise ArgumentError, "Arguments should be a hash. Failed to parse #{args.inspect}, #{args.class}"
end
limit = args.delete("LIMIT") || -1
maxlength = args.delete("MAXLENGTH") || -1
if args.any?
filter = args["FILTER"]
startrow = args["STARTROW"] || ''
stoprow = args["STOPROW"]
timestamp = args["TIMESTAMP"]
columns = args["COLUMNS"] || args["COLUMN"] || get_all_columns
cache = args["CACHE_BLOCKS"] || true
versions = args["VERSIONS"] || 1
# Normalize column names
columns = [columns] if columns.class == String
unless columns.kind_of?(Array)
raise ArgumentError.new("COLUMNS must be specified as a String or an Array")
end
scan = if stoprow
org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes, stoprow.to_java_bytes)
else
org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes)
end
columns.each { |c| scan.addColumns(c) }
scan.setFilter(filter) if filter
scan.setTimeStamp(timestamp) if timestamp
scan.setCacheBlocks(cache)
scan.setMaxVersions(versions) if versions > 1
else
scan = org.apache.hadoop.hbase.client.Scan.new
end
# Start the scanner
scanner = @table.getScanner(scan)
count = 0
res = {}
iter = scanner.iterator
# Iterate results
while iter.hasNext
if limit > 0 && count >= limit
break
end
row = iter.next
key = org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow)
row.list.each do |kv|
family = String.from_java_bytes(kv.getFamily)
qualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getQualifier)
column = "#{family}:#{qualifier}"
cell = to_string(column, kv, maxlength)
if block_given?
yield(key, "column=#{column}, #{cell}")
else
res[key] ||= {}
res[key][column] = cell
end
end
# One more row processed
count += 1
end
return ((block_given?) ? count : res)
end
#----------------------------------------------------------------------------------------
# Helper methods
# Returns a list of column names in the table
def get_all_columns
@table.table_descriptor.getFamilies.map do |family|
"#{family.getNameAsString}:"
end
end
# Checks if current table is one of the 'meta' tables
def is_meta_table?
tn = @table.table_name
org.apache.hadoop.hbase.util.Bytes.equals(tn, org.apache.hadoop.hbase.HConstants::META_TABLE_NAME) || org.apache.hadoop.hbase.util.Bytes.equals(tn, org.apache.hadoop.hbase.HConstants::ROOT_TABLE_NAME)
end
# Returns family and (when has it) qualifier for a column name
def parse_column_name(column)
split = org.apache.hadoop.hbase.KeyValue.parseColumn(column.to_java_bytes)
return split[0], (split.length > 1) ? split[1] : nil
end
# Make a String of the passed kv
# Intercept cells whose format we know such as the info:regioninfo in .META.
def to_string(column, kv, maxlength = -1)
if is_meta_table?
if column == 'info:regioninfo' or column == 'info:splitA' or column == 'info:splitB'
hri = org.apache.hadoop.hbase.util.Writables.getHRegionInfoOrNull(kv.getValue)
return "timestamp=%d, value=%s" % [kv.getTimestamp, hri.toString]
end
if column == 'info:serverstartcode'
if kv.getValue.length > 0
str_val = org.apache.hadoop.hbase.util.Bytes.toLong(kv.getValue)
else
str_val = org.apache.hadoop.hbase.util.Bytes.toStringBinary(kv.getValue)
end
return "timestamp=%d, value=%s" % [kv.getTimestamp, str_val]
end
end
val = "timestamp=#{kv.getTimestamp}, value=#{org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getValue)}"
(maxlength != -1) ? val[0, maxlength] : val
end
end
end