Compaction can be used to post-process files pulled by Gobblin with certain semantics. Deduplication is one of the common reasons to do compaction, e.g., you may want to
This is because duplicates can be generated for multiple reasons including both intended and unintended:
Gobblin provides two compactors out-of-the-box, a MapReduce compactor and a Hive compactor.
The MapReduce compactor can be used to deduplicate on all or certain fields of the records. For duplicate records, one of them will be preserved; there is no guarantee which one will be preserved.
A use case of MapReduce Compactor is for Kafka records deduplication. We will use the following example use case to explain the MapReduce Compactor.
Suppose we ingest data from a Kafka broker, and we would like to publish the data by hour and by day, both of which are deduplicated:
hourly_staging folder, e.g., /data/kafka_topics/NewUserEvent/hourly_staging/2015/10/29/08...hourly_staging and publish data into hourly, e.g., /data/kafka_topics/NewUserEvent/hourly/2015/10/29/08...hourly and publish data into daily, e.g., /data/kafka_topics/NewUserEvent/daily/2015/10/29...MRCompactor.compact() is the entry point for MapReduce-based compaction. The input data to be compacted is specified by compaction.input.dir. Each subdir under compaction.input.dir is considered a topic. Each topic may contain multiple datasets, each of which is a unit for compaction. It is up to MRCompactorJobPropCreator to determine what is a dataset under each topic. If a topic has multiple levels of folders, subsequent levels can be specified using compaction.input.subdir.
In the above example use case, for hourly compaction, each dataset contains an hour's data in the hourly_staging folder, e.g., /data/kafka_topics/NewUserEvent/hourly_staging/2015/10/29/08; for daily compaction, each dataset contains 24 hourly folder of a day, e.g., /data/kafka_topics/NewUserEvent/hourly/2015/10/29. In hourly compaction, you may use the following config properties:
compaction.input.dir=/data/kafka_topics compaction.dest.dir=/data/kafka_topics compaction.input.subdir=hourly_staging compaction.dest.subdir=hourly compaction.folder.pattern=YYYY/MM/dd compaction.timebased.max.time.ago=3h compaction.timebased.min.time.ago=1h compaction.jobprops.creator.class=gobblin.compaction.mapreduce.MRCompactorTimeBasedJobPropCreator compaction.job.runner.class=gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner (if your data is Avro)
If your data format is not Avro, you can implement a different job runner class for deduplicating your data format. compaction.timebased.max.time.ago and compaction.timebased.min.time.ago are used to control the earliest and latest folders to be processed, e.g., if there values are 3h and 1h, respectively, and suppose the current time is 10/07 9:20am, it will not process folders on 10/07/06 or before (since they are more than 3 hours ago) or folders on 10/07/09 (since they are less than 1 hour ago).
There are two types of Non-deduping compaction.
Property compaction.input.deduplicated specifies whether the input data are deduplicated (default is false), and property compaction.output.deduplicated specifies whether the output data should be deduplicated (default is true). For type 1 deduplication, set both to false. For type 2 deduplication, set both to true.
The reason these two types of compaction need to be separated is because of late data handling, which we will explain next.
Late records are records that arrived at a folder after compaction on this folder has started. We explain how Gobblin handles late records using the following example.
In this use case, both hourly compaction and daily compaction need a mechanism to handle late records. For hourly compaction, late records are records that arrived at an hourly_staging folder after the hourly compaction of that folder has started. It is similar for daily compaction.
Compaction with Deduplication
For a compaction with deduplication (i.e., hourly compaction in the above use case), there are two options to deal with late data:
To do so, set compaction.job.overwrite.output.dir=true and compaction.recompact.from.input.for.late.data=true.
Please note the following when you use this option: (1) this means that your already-published data will be re-published if late data are detected; (2) this is potentially dangerous if your input folders have short retention periods. For example, suppose hourly_staging folders have a 2-day retention period, i.e., folder /data/kafka_topics/NewUserEvent/hourly_staging/2015/10/29 will be deleted on 2015/10/31. If, after 2015/10/31, new data arrived at this folder and you re-compact this folder and publish the data to hourly, all original data will be gone. To avoid this problem you may set compaction.timebased.max.time.ago=2d so that compaction will not be performed on a folder more than 2 days ago. However, this means that if a late record is late for more than 2 days, it will never be published into hourly.
[output_subdir]/_late folder, e.g., for hourly compaction, late data in hourly_staging will be copied to hourly_late folders, e.g., /data/kafka_topics/NewUserEvent/hourly_late/2015/10/29....If re-compaction is not necessary, this is all you need to do. If re-compaction is needed, you may schedule or manually invoke a re-compaction job which will re-compact by consuming data in both hourly and hourly_late. For this job, you need to set compaction.job.overwrite.output.dir=true and compaction.recompact.from.dest.paths=true.
Note that this re-compaction is different from the re-compaction in Option 1: this re-compaction consumes data in output folders (i.e., hourly) whereas the re-compaction in Option 1 consumes data in input folders (i.e., hourly_staging).
Compaction without Deduplication
For a compaction without deduplication, if it is type 2, the same two options above apply. If it is type 1, late data will simply be copied to the output folder.
How to Determine if a Data File is Late
Every time a compaction finishes (except the case below), Gobblin will create a file named _COMPACTION_COMPLETE in the compaction output folder. This file contains the timestamp of when the compaction job starts. All files in the input folder with earlier modification timestamps have been compacted. Next time the compaction runs, files in the input folder with later timestamps are considered late data.
The _COMPACTION_COMPLETE file will be only be created if it is a regular compaction that consumes input data (including compaction jobs that just copy late data to the output folder or the [output_subdir]/_late folder without launching an MR job). It will not be created if it is a re-compaction that consumes output data. This is because whether a file in the input folder is a late file depends on whether it has been compacted or moved into the output folder, which is not affected by a re-compaction that consumes output data.
One way of reducing the chance of seeing late records is to verify data completeness before running compaction, which will be explained next.
Besides aborting the compaction job for a dataset if new data in the input folder is found, another way to reduce the chance of seeing late events is to verify the completeness of input data before running compaction. To do so, set compaction.completeness.verification.enabled=true, extend DataCompletenessVerifier.AbstractRunner and put in your verification logic, and pass it via compaction.completeness.verification.class.
When data completeness verification is enabled, MRCompactor will verify data completeness for the input datasets, and meanwhile speculatively start the compaction MR jobs. When the compaction MR job for a dataset finishes, if the completeness of the dataset is verified, its compacted data will be published, otherwise it is discarded, and the compaction MR job for this dataset will be launched again with a reduced priority.
It is possible to control which topics should or should not be verified via compaction.completeness.verification.whitelist and compaction.completeness.verification.blacklist. It is also possible to set a timeout for data completeness verification via compaction.completeness.verification.timeout.minutes. A dataset whose completeness verification timed out can be configured to be either compacted anyway or not compacted.
The Hive compactor can be used to merge a snapshot with one or multiple deltas. It assumes the snapshot and the deltas meet the following requirements:
In the near future we also plan to support selecting records by timestamps (rather than which file they appear). This is useful if the snapshot and the deltas are pulled in parallel, where if a key has multiple occurrences we should keep the one with the latest timestamp.
Note that since delta tables don't have information of deleted records, such information is only available the next time the full snapshot is pulled.
After building Gobblin (i.e., ./gradlew clean build), a zipped file build/gobblin-compaction/distributions/gobblin-compaction.tar.gz should be created. It contains a jar file (gobblin-compaction.jar), a folder of dependencies (gobblin-compaction_lib), and a log4j config file (log4j.xml).
To run compaction, extract it into a folder, go to that folder and run
java -jar gobblin-compaction.jar <global-config-file>
If for whatever reason (e.g., your Hadoop cluster is in secure mode) you need to run the jar using Hadoop or Yarn, then you first need to make sure the correct log4j config file is used, since there is another log4j config file in the Hadoop classpath. To do so, run the following two commands:
export HADOOP_CLASSPATH=. export HADOOP_USER_CLASSPATH_FIRST=true
The first command adds the current directory to the Hadoop classpath, and the second command tells Hadoop/Yarn to prioritize user's classpath. Then you can run the compaction jar:
hadoop jar gobblin-compaction.jar <global-config-file>
or
yarn jar gobblin-compaction.jar <global-config-file>
The merged data will be written to the HDFS directory specified in output.datalocation, as one or more Avro files. The schema of the output data will be the same as the schema of the last delta (which is the last pulled data and thus has the latest schema).
The provided log4j config file (log4j.xml) prints logs from Gobblin compaction classes to the console, and writes logs from other classes (e.g., Hive classes) to logs/gobblin-compaction.log. Note that for drop table queries (DROP TABLE IF EXISTS <tablename>), the Hive JDBC client will throw NoSuchObjectException if the table doesn't exist. This is normal and such exceptions should be ignored.
(1) Required:
This is the the compaction jobconfig directory. Each file in this directory should be a jobconfig file (described in the next section).
(2) Optional:
Hadoop configuration files that should be loaded (e.g., hadoop.configfile.coresite.xml=/export/apps/hadoop/latest/etc/hadoop/core-site.xml)
If property fs.defaultFS (or fs.default.name) is specified in the hadoop config file, then this property is not needed. However, if it is specified, it will override fs.defaultFS (or fs.default.name).
If fs.defaultFS or fs.default.name is not specified in the hadoop config file, and this property is also not specified, then the default value “hdfs://localhost:9000” will be used.
Either 1 or 2.
hiveserver.connection.string
hiveserver.url
hiveserver.user (default: "")
hiveserver.password (default: "")
If hiveserver.connection.string is specified, it will be used to connect to hiveserver.
If hiveserver.connection.string is not specified but hiveserver.url is specified, then it uses (hiveserver.url, hiveserver.user, hiveserver.password) to connect to hiveserver.
If neither hiveserver.connection.string nor hiveserver.url is specified, then embedded hiveserver will be used (i.e., jdbc:hive:// if hiveserver.version=1, jdbc:hive2:// if hiveserver.version=2)
Directory that contains hive-site.xml, if hive-site.xml should be loaded.
Any hive config property. (e.g., hive.join.cache.size). If specified, it will override the corresponding property in hive-site.xml.
(1) Required:
comma separated primary key attributes of the snapshot table
snapshot data directory in HDFS
the primary key of ith delta table (the primary key of snapshot and all deltas should be the same)
ith delta table's data directory in HDFS
the HDFS data directory for the output (make sure you have write permission on this directory)
(2) Optional:
prefix name of the snapshot table. The table name will be snapshot.name + random suffix
snapshot table's schema location in HDFS. If not specified, schema will be extracted from the data.
prefix name of the ith delta table. The table name will be delta.i.name + random suffix
ith delta table's schema location in HDFS. If not specified, schema will be extracted from the data.
prefix name of the output table. The table name will be output.name + random suffix
the database name to be used. This database should already exist, and you should have write permission on it.
queue name to be used.
whether map-side join should be turned on. If specified both in this property and in the global config file (hive.*), this property takes precedences.
if hive.use.mapjoin = true, mapjoin will be used if the small table size is smaller than hive.mapjoin.smalltable.filesize (in bytes). If specified both in this property and in the global config file (hive.*), this property takes precedences.
If we need to extract schema from data, this dir is for the extracted schema. Note that if you do not have write permission on the default dir, you must specify this property as a dir where you do have write permission.
Set to true if you don't want to (or are unable to) create external table on snapshot.datalocation. A copy of the snapshot data will be created in hive.tmpdata.dir, and will be removed after the compaction.
This property should be set to true if either of the following two situations applies:
(i) You don't have write permission to snapshot.datalocation. If so, once you create an external table on snapshot.datalocation, you may not be able to drop it. This is a Hive bug and for more information, see this page, which includes a Hive patch for the bug.
(ii) You want to use a certain subset of files in snapshot.datalocation (e.g., snapshot.datalocation contains both .csv and .avro files but you only want to use .avro files)
Similar as snapshot.copydata
If snapshot.copydata = true or delta.i.copydata = true, the data will be copied to this dir. You should have write permission to this dir.
If snapshot.copydata = true, then only those data files whose extension is snapshot.dataformat will be moved to hive.tmpdata.dir.
Similar as snapshot.dataformat.extension.name.
Number of reducers for the job.
A file where the running time of each compaction job is printed.