Compaction can be used to post-process files pulled by Gobblin with certain semantics. Deduplication is one of the common reasons to do compaction, e.g., you may want to
This is because duplicates can be generated for multiple reasons including both intended and unintended:
Gobblin provides two compactors out-of-the-box, a MapReduce compactor and a Hive compactor.
The MapReduce compactor can be used to deduplicate on all or certain fields of the records. For duplicate records, one of them will be preserved; there is no guarantee which one will be preserved.
A use case of MapReduce Compactor is for Kafka records deduplication. We will use the following example use case to explain the MapReduce Compactor.
Suppose we ingest data from a Kafka broker, and we would like to publish the data by hour and by day, both of which are deduplicated:
hourly_staging
folder, e.g., /data/kafka_topics/PageViewEvent/hourly_staging/2015/10/29/08...
hourly_staging
and publish data into hourly
, e.g., /data/kafka_topics/PageViewEvent/hourly/2015/10/29/08...
hourly
and publish data into daily
, e.g., /data/kafka_topics/PageViewEvent/daily/2015/10/29...
MRCompactor.compact()
is the entry point for MapReduce-based compaction. The compaction unit is Dataset
. MRCompactor
uses a DatasetsFinder
to find all datasets eligible for compaction. Implementations of DatasetsFinder
include SimpleDatasetsFinder
and TimeBasedSubDirDatasetsFinder
.
In the above example use case, for hourly compaction, each dataset contains an hour's data in the hourly_staging
folder, e.g., /data/kafka_topics/PageViewEvent/hourly_staging/2015/10/29/08
; for daily compaction, each dataset contains 24 hourly folder of a day, e.g., /data/kafka_topics/PageViewEvent/hourly/2015/10/29
. In hourly compaction, you may use the following config properties:
compaction.datasets.finder=org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder compaction.input.dir=/data/kafka_topics compaction.dest.dir=/data/kafka_topics compaction.input.subdir=hourly_staging compaction.dest.subdir=hourly compaction.folder.pattern=YYYY/MM/dd compaction.timebased.max.time.ago=3h compaction.timebased.min.time.ago=1h compaction.jobprops.creator.class=org.apache.gobblin.compaction.mapreduce.MRCompactorTimeBasedJobPropCreator compaction.job.runner.class=org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner (if your data is Avro)
If your data format is not Avro, you can implement a different job runner class for deduplicating your data format.
There are two types of Non-deduping compaction.
Property compaction.input.deduplicated
specifies whether the input data are deduplicated (default is false), and property compaction.output.deduplicated
specifies whether the output data should be deduplicated (default is true). For type 1 deduplication, set both to false. For type 2 deduplication, set both to true.
The reason these two types of compaction need to be separated is because of late data handling, which we will explain next.
Late records are records that arrived at a folder after compaction on this folder has started. We explain how Gobblin handles late records using the following example.
In this use case, both hourly compaction and daily compaction need a mechanism to handle late records. For hourly compaction, late records are records that arrived at an hourly_staging
folder after the hourly compaction of that folder has started. It is similar for daily compaction.
Compaction with Deduplication
For a compaction with deduplication (i.e., hourly compaction in the above use case), there are two options to deal with late data:
To do so, set compaction.job.overwrite.output.dir=true
and compaction.recompact.from.input.for.late.data=true
.
Please note the following when you use this option: (1) this means that your already-published data will be re-published if late data are detected; (2) this is potentially dangerous if your input folders have short retention periods. For example, suppose hourly_staging
folders have a 2-day retention period, i.e., folder /data/kafka_topics/PageViewEvent/hourly_staging/2015/10/29
will be deleted on 2015/10/31. If, after 2015/10/31, new data arrived at this folder and you re-compact this folder and publish the data to hourly
, all original data will be gone. To avoid this problem you may set compaction.timebased.max.time.ago=2d
so that compaction will not be performed on a folder more than 2 days ago. However, this means that if a late record is late for more than 2 days, it will never be published into hourly
.
[output_subdir]/_late
folder, e.g., for hourly compaction, late data in hourly_staging
will be copied to hourly_late
folders, e.g., /data/kafka_topics/PageViewEvent/hourly_late/2015/10/29...
.If re-compaction is not necessary, this is all you need to do. If re-compaction is needed, you may schedule or manually invoke a re-compaction job which will re-compact by consuming data in both hourly
and hourly_late
. For this job, you need to set compaction.job.overwrite.output.dir=true
and compaction.recompact.from.dest.paths=true
.
Note that this re-compaction is different from the re-compaction in Option 1: this re-compaction consumes data in output folders (i.e., hourly
) whereas the re-compaction in Option 1 consumes data in input folders (i.e., hourly_staging
).
Compaction without Deduplication
For a compaction without deduplication, if it is type 2, the same two options above apply. If it is type 1, late data will simply be copied to the output folder.
How to Determine if a Data File is Late
Every time a compaction finishes (except the case below), Gobblin will create a file named _COMPACTION_COMPLETE
in the compaction output folder. This file contains the timestamp of when the compaction job starts. All files in the input folder with earlier modification timestamps have been compacted. Next time the compaction runs, files in the input folder with later timestamps are considered late data.
The _COMPACTION_COMPLETE
file will be only be created if it is a regular compaction that consumes input data (including compaction jobs that just copy late data to the output folder or the [output_subdir]/_late
folder without launching an MR job). It will not be created if it is a re-compaction that consumes output data. This is because whether a file in the input folder is a late file depends on whether it has been compacted or moved into the output folder, which is not affected by a re-compaction that consumes output data.
One way of reducing the chance of seeing late records is to verify data completeness before running compaction, which will be explained next.
Besides aborting the compaction job for a dataset if new data in the input folder is found, another way to reduce the chance of seeing late events is to verify the completeness of input data before running compaction. To do so, set compaction.completeness.verification.enabled=true
, extend DataCompletenessVerifier.AbstractRunner
and put in your verification logic, and pass it via compaction.completeness.verification.class
.
When data completeness verification is enabled, MRCompactor
will verify data completeness for the input datasets, and meanwhile speculatively start the compaction MR jobs. When the compaction MR job for a dataset finishes, if the completeness of the dataset is verified, its compacted data will be published, otherwise it is discarded, and the compaction MR job for this dataset will be launched again with a reduced priority.
It is possible to control which topics should or should not be verified via compaction.completeness.verification.whitelist
and compaction.completeness.verification.blacklist
. It is also possible to set a timeout for data completeness verification via compaction.completeness.verification.timeout.minutes
. A dataset whose completeness verification timed out can be configured to be either compacted anyway or not compacted.
The Hive compactor can be used to merge a snapshot with one or multiple deltas. It assumes the snapshot and the deltas meet the following requirements:
The merged data will be written to the HDFS directory specified in output.datalocation
, as one or more Avro files. The schema of the output data will be the same as the schema of the last delta (which is the last pulled data and thus has the latest schema).
In the near future we also plan to support selecting records by timestamps (rather than which file they appear). This is useful if the snapshot and the deltas are pulled in parallel, where if a key has multiple occurrences we should keep the one with the latest timestamp.
Note that since delta tables don't have information of deleted records, such information is only available the next time the full snapshot is pulled.
A Hive Compactor job consists of one global configuration file which refers to one or more job configuration(s).
(1) Required:
This is the the compaction jobconfig directory. Each file in this directory should be a jobconfig file (described in the next section).
(2) Optional:
Hadoop configuration files that should be loaded (e.g., hadoop.configfile.coresite.xml=/export/apps/hadoop/latest/etc/hadoop/core-site.xml)
If property fs.defaultFS
(or fs.default.name
) is specified in the hadoop config file, then this property is not needed. However, if it is specified, it will override fs.defaultFS
(or fs.default.name
).
If fs.defaultFS
or fs.default.name
is not specified in the hadoop config file, and this property is also not specified, then the default value “hdfs://localhost:9000” will be used.
Either 1 or 2.
hiveserver.connection.string
hiveserver.url
hiveserver.user (default: "")
hiveserver.password (default: "")
If hiveserver.connection.string
is specified, it will be used to connect to hiveserver.
If hiveserver.connection.string
is not specified but hiveserver.url
is specified, then it uses (hiveserver.url
, hiveserver.user
, hiveserver.password
) to connect to hiveserver.
If neither hiveserver.connection.string
nor hiveserver.url
is specified, then embedded hiveserver will be used (i.e., jdbc:hive://
if hiveserver.version=1
, jdbc:hive2://
if hiveserver.version=2
)
Directory that contains hive-site.xml, if hive-site.xml should be loaded.
Any hive config property. (e.g., hive.join.cache.size
). If specified, it will override the corresponding property in hive-site.xml.
(1) Required:
comma separated primary key attributes of the snapshot table
snapshot data directory in HDFS
the primary key of ith delta table (the primary key of snapshot and all deltas should be the same)
ith delta table's data directory in HDFS
the HDFS data directory for the output (make sure you have write permission on this directory)
(2) Optional:
prefix name of the snapshot table. The table name will be snapshot.name + random suffix
snapshot table's schema location in HDFS. If not specified, schema will be extracted from the data.
prefix name of the ith delta table. The table name will be delta.i.name + random suffix
ith delta table's schema location in HDFS. If not specified, schema will be extracted from the data.
prefix name of the output table. The table name will be output.name + random suffix
the database name to be used. This database should already exist, and you should have write permission on it.
queue name to be used.
whether map-side join should be turned on. If specified both in this property and in the global config file (hive.*), this property takes precedences.
if hive.use.mapjoin = true, mapjoin will be used if the small table size is smaller than hive.mapjoin.smalltable.filesize (in bytes). If specified both in this property and in the global config file (hive.*), this property takes precedences.
If we need to extract schema from data, this dir is for the extracted schema. Note that if you do not have write permission on the default dir, you must specify this property as a dir where you do have write permission.
Set to true if you don't want to (or are unable to) create external table on snapshot.datalocation. A copy of the snapshot data will be created in hive.tmpdata.dir
, and will be removed after the compaction.
This property should be set to true if either of the following two situations applies:
(i) You don't have write permission to snapshot.datalocation
. If so, once you create an external table on snapshot.datalocation
, you may not be able to drop it. This is a Hive bug and for more information, see this page, which includes a Hive patch for the bug.
(ii) You want to use a certain subset of files in snapshot.datalocation
(e.g., snapshot.datalocation
contains both .csv and .avro files but you only want to use .avro files)
Similar as snapshot.copydata
If snapshot.copydata
= true or delta.i.copydata
= true, the data will be copied to this dir. You should have write permission to this dir.
If snapshot.copydata
= true, then only those data files whose extension is snapshot.dataformat
will be moved to hive.tmpdata.dir
.
Similar as snapshot.dataformat.extension.name
.
Number of reducers for the job.
A file where the running time of each compaction job is printed.
Both the MapReduce and Hive-based compaction configurations can be executed with bin/gobblin-compaction.sh
.
The usage is as follows:
gobblin-compaction.sh [OPTION] --type <compaction type: hive or mr> --conf <compaction configuration file> Where OPTION can be: --projectversion <version> Gobblin version to be used. If set, overrides the distribution build version --logdir <log dir> Gobblin's log directory: if not set, taken from ${GOBBLIN_LOG_DIR} if present. --help Display this help and exit
Example:
cd gobblin-dist bin/gobblin-compaction.sh --type hive --conf compaction.properties
The log4j configuration is read from conf/log4j-compaction.xml
. Please note that in case of a Hive-compaction for drop table queries (DROP TABLE IF EXISTS <tablename>
), the Hive JDBC client will throw NoSuchObjectException
if the table doesn't exist. This is normal and such exceptions should be ignored.