Gobblin hive distcp is built on top of Gobblin distcp. It uses Hive metastore to find datasets to copy, then performs regular file listings to find the actual files to copy. After finishing the copy, the Hive registrations in the source are replicated on the target.
This document will show an sample job config of running Gobblin hive distcp, and explain how it works.
Below is the sample job config of running Gobblin hive distcp. Gobblin job constructs and data flow are the same as Gobblin distcp. The only difference is the gobblin.data.profile.class
and hive related properties.
job.name=SampleHiveDistcp job.group=HiveDistcp job.description=Sample job config for hive distcp extract.namespace=org.apache.gobblin.copy.tracking gobblin.dataset.profile.class=org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder data.publisher.type=org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher source.class=org.apache.gobblin.data.management.copy.CopySource writer.builder.class=org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder converter.classes=org.apache.gobblin.converter.IdentityConverter hive.dataset.copy.target.table.prefixToBeReplaced= hive.dataset.copy.target.table.prefixReplacement= data.publisher.final.dir=${hive.dataset.copy.target.table.prefixReplacement} hive.dataset.hive.metastore.uri= hive.dataset.copy.target.metastore.uri= hive.dataset.whitelist= hive.dataset.copy.target.database= hive.dataset.existing.entity.conflict.policy=REPLACE_PARTITIONS hive.dataset.copy.deregister.fileDeleteMethod=NO_DELETE hive.dataset.copy.location.listing.method=RECURSIVE hive.dataset.copy.locations.listing.skipHiddenPaths=true gobblin.copy.preserved.attributes=rgbp
hive.dataset.hive.metastore.uri
and hive.dataset.copy.target.metastore.uri
specify the source and target metastore uri. Make sure the hive distcp job has access to both hive metastores.
Use a whitelist and optionally a blacklist using to specify tables to copy using the keys hive.dataset.whitelist
and hive.dataset.blacklist
. Both whitelist and blacklist accept various patterns, for example:
sampleTable
in database sampleDb
;sampleDb
;sample
in database sampleDb
.The key hive.dataset.copy.target.database
specifies the target database to create tables under. If omitted, will use the same as the source.
This specifies where copied files should be placed. There are a few options on how the target paths will be computed:
hive.dataset.copy.target.table.prefixToBeReplaced
and hive.dataset.copy.target.table.prefixReplacement
. Any paths that are not a descendant of prefixToBeReplaced
will throw an error and will fail the dataset. Note that setting both keys to “/” effectively replicates all paths exactly.hive.dataset.copy.target.table.root
to specify the replacement. Note there is some primitive token replacement in the value of the key if using the tokens $DB and $TABLE, which will be replaced by the database and table name respectively. If the token $TABLE is not present, however, the table name will be automatically appended to the new table root (see last example below).hive.dataset.copy.target.table.root
and processed with the token replacements explained in “new table root”. To enable this mode set hive.dataset.copy.relocate.data.files
to true and set hive.dataset.copy.target.table.root
appropriately.If distcp-ng finds that a partition or table it needs to create already exists it will determine whether the existing table / partition is identical to what it would register (e.g. compare schema, location, etc.). If not, it will use a policy to determine how to proceed. The policy is specified using the key hive.dataset.existing.entity.conflict.policy
and can take the following values:
Sometimes distcp-ng must deregister a table / partition, for example if it doesn't exist in the source, or if it must be replaced. In this case, distcp-ng offers options on what to do with the files under the deregistered partition. Set this policy using the key hive.dataset.copy.deregister.fileDeleteMethod
which can take the following values:
To specify the files that distcp will copy for each table / partition, use the key hive.dataset.copy.location.listing.method
which can take the values:
hive.dataset.copy.locations.listing.skipHiddenPaths
, which, if true, will not copy any hidden files.A partition filter can be applied when copying partitioned tables. Filters can only be applied to text partition columns. To speficy a partition filter use the key hive.dataset.copy.partition.filter.generator
.
gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator
: Filters date-representing partitions by a lookback (i.e. only copy recent partitions). Use the keys hive.dataset.partition.filter.datetime.column
, hive.dataset.partition.filter.datetime.lookback
, and hive.dataset.partition.filter.datetime.format
to configure the filter.A predicate that operates on partitions can be provided to distcp-ng to allow it to quickly skip partitions without having to list all of the source and target files and do a diff on those sets (a costly operation). To set this predicate, provide the class name of the predicate with the key hive.dataset.copy.fast.partition.skip.predicate
. Currently only one such predicate exists:
RegistrationTimeSkipPredicate
: This predicate compares the Hive partition attribute registrationGenerationTimeMillis
in the target with the modification time of the partition directory in the source. The partition is skipped unless the directory was modified more recently than the registrationGenerationTime. The attribute registrationGenerationTimeMillis
is an attribute set by distcp-ng representing (for all practical purposes) the time at which the distcp-ng job that registered that table started.