blob: 909f4c45ff17ec6a8ae00fc943de49e307279027 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
== High-Speed Ingest
Accumulo is often used as part of a larger data processing and storage system. To
maximize the performance of a parallel system involving Accumulo, the ingestion
and query components should be designed to provide enough parallelism and
concurrency to avoid creating bottlenecks for users and other systems writing to
and reading from Accumulo. There are several ways to achieve high ingest
performance.
=== Pre-Splitting New Tables
New tables consist of a single tablet by default. As mutations are applied, the table
grows and splits into multiple tablets which are balanced by the Master across
TabletServers. This implies that the aggregate ingest rate will be limited to fewer
servers than are available within the cluster until the table has reached the point
where there are tablets on every TabletServer.
Pre-splitting a table ensures that there are as many tablets as desired available
before ingest begins to take advantage of all the parallelism possible with the cluster
hardware. Tables can be split at any time by using the shell:
user@myinstance mytable> addsplits -sf /local_splitfile -t mytable
For the purposes of providing parallelism to ingest it is not necessary to create more
tablets than there are physical machines within the cluster as the aggregate ingest
rate is a function of the number of physical machines. Note that the aggregate ingest
rate is still subject to the number of machines running ingest clients, and the
distribution of rowIDs across the table. The aggregation ingest rate will be
suboptimal if there are many inserts into a small number of rowIDs.
=== Multiple Ingester Clients
Accumulo is capable of scaling to very high rates of ingest, which is dependent upon
not just the number of TabletServers in operation but also the number of ingest
clients. This is because a single client, while capable of batching mutations and
sending them to all TabletServers, is ultimately limited by the amount of data that
can be processed on a single machine. The aggregate ingest rate will scale linearly
with the number of clients up to the point at which either the aggregate I/O of
TabletServers or total network bandwidth capacity is reached.
In operational settings where high rates of ingest are paramount, clusters are often
configured to dedicate some number of machines solely to running Ingester Clients.
The exact ratio of clients to TabletServers necessary for optimum ingestion rates
will vary according to the distribution of resources per machine and by data type.
=== Bulk Ingest
Accumulo supports the ability to import files produced by an external process such
as MapReduce into an existing table. In some cases it may be faster to load data this
way rather than via ingesting through clients using BatchWriters. This allows a large
number of machines to format data the way Accumulo expects. The new files can
then simply be introduced to Accumulo via a shell command.
To configure MapReduce to format data in preparation for bulk loading, the job
should be set to use a range partitioner instead of the default hash partitioner. The
range partitioner uses the split points of the Accumulo table that will receive the
data. The split points can be obtained from the shell and used by the MapReduce
RangePartitioner. Note that this is only useful if the existing table is already split
into multiple tablets.
user@myinstance mytable> getsplits
aa
ab
ac
...
zx
zy
zz
Run the MapReduce job, using the AccumuloFileOutputFormat to create the files to
be introduced to Accumulo. Once this is complete, the files can be added to
Accumulo via the shell:
user@myinstance mytable> importdirectory /files_dir /failures
Note that the paths referenced are directories within the same HDFS instance over
which Accumulo is running. Accumulo places any files that failed to be added to the
second directory specified.
A complete example of using Bulk Ingest can be found at
+accumulo/docs/examples/README.bulkIngest+.
=== Logical Time for Bulk Ingest
Logical time is important for bulk imported data, for which the client code may
be choosing a timestamp. At bulk import time, the user can choose to enable
logical time for the set of files being imported. When its enabled, Accumulo
uses a specialized system iterator to lazily set times in a bulk imported file.
This mechanism guarantees that times set by unsynchronized multi-node
applications (such as those running on MapReduce) will maintain some semblance
of causal ordering. This mitigates the problem of the time being wrong on the
system that created the file for bulk import. These times are not set when the
file is imported, but whenever it is read by scans or compactions. At import, a
time is obtained and always used by the specialized system iterator to set that
time.
The timestamp assigned by Accumulo will be the same for every key in the file.
This could cause problems if the file contains multiple keys that are identical
except for the timestamp. In this case, the sort order of the keys will be
undefined. This could occur if an insert and an update were in the same bulk
import file.
=== MapReduce Ingest
It is possible to efficiently write many mutations to Accumulo in parallel via a
MapReduce job. In this scenario the MapReduce is written to process data that lives
in HDFS and write mutations to Accumulo using the AccumuloOutputFormat. See
the MapReduce section under Analytics for details.
An example of using MapReduce can be found under
+accumulo/docs/examples/README.mapred+.