MASC provides an Apache Spark native connector for Apache Accumulo to integrate the rich Spark machine learning eco-system with the scalable and secure data storage capabilities of Accumulo.
MASC is advantageous in many use-cases, below we list a few.
Scenario 1: A data analyst needs to execute model inference on large amount of data in Accumulo.
Benefit: Instead of transferring all the data to a large Spark cluster to score using a Spark model, the connector exports and runs the model on the Accumulo cluster. This reduces the need for a large Spark cluster as well as the amount of data transferred between systems, and can improve inference speeds (>2x speedups observed).
Scenario 2: A data scientist needs to train a Spark model on a large amount of data in Accumulo.
Benefit: Instead of pulling all the data into a large Spark cluster and restructuring the format to use Spark ML Lib tools, the connector streams data into Spark as a DataFrame reducing time to train and Spark cluster size / memory requirements.
Scenario 3: A data analyst needs to perform ad hoc analysis on large amounts of data stored in Accumulo.
Benefit: Instead of pulling all the data into a large Spark cluster, the connector prunes rows and columns using pushdown filtering with a flexible expression language.
The Accumulo-Spark connector is composed of two components:
More detailed documentation on installation and use is available in the Connector documentation
JARs available on Maven Central Repository:
Accumulo Iterator - Backend for Spark DataSource
from configparser import ConfigParser from pyspark.sql import types as T def get_properties(properties_file): """Read Accumulo client properties file""" config = ConfigParser() with open(properties_file) as stream: config.read_string("[top]\n" + stream.read()) return dict(config['top']) properties = get_properties('/opt/muchos/install/accumulo-2.0.0/conf/accumulo-client.properties') properties['table'] = 'demo_table' # Define Accumulo table where data will be written properties['rowkey'] = 'id' # Identify column to use as the key for Accumulo rows # define the schema schema = T.StructType([ T.StructField("sentiment", T.IntegerType(), True), T.StructField("date", T.StringType(), True), T.StructField("query_string", T.StringType(), True), T.StructField("user", T.StringType(), True), T.StructField("text", T.StringType(), True) ]) # Read from Accumulo df = (spark .read .format("com.microsoft.accumulo") .options(**options) # define Accumulo properties .schema(schema)) # define schema for data retrieval # Write to Accumulo properties['table'] = 'output_table' (df .write .format("com.microsoft.accumulo") .options(**options) .save())
See the demo notebook for more examples.
The benchmark setup used a 1,000-node Accumulo 2.0.0 Cluster (16,000 cores) running and a 256-node Spark 2.4.3 cluster (4,096 cores). All nodes used Azure D16s_v3 (16 cores) virtual machines. Fluo-muchos was used to handle Accumulo and Spark cluster deployments and configuration.
In all experiments we use the same base dataset which is a collection of Twitter user tweets with labeled sentiment value. This dataset is known as the Sentiment140 dataset (Go, Bhayani, & Huang, 2009). The training data consist of 1.6M samples of tweets, where each tweet has columns indicating the sentiment label, user, timestamp, query term, and text. The text is limited to 140 characters and the overall uncompressed size of the training dataset is 227MB.
sentiment | id | date | query_string | user | text |
---|---|---|---|---|---|
0 | 1467810369 | Mon Apr 06 22:19:... | NO_QUERY | TheSpecialOne | @switchfoot http:... |
0 | 1467810672 | Mon Apr 06 22:19:... | NO_QUERY | scotthamilton | is upset that he ... |
0 | 1467810917 | Mon Apr 06 22:19:... | NO_QUERY | mattycus | @Kenichan I dived... |
To evaluate different table sizes and the impact of splitting the following procedure was used to generate the Accumulo tables:
A common machine learning scenario was evaluated using a sentiment model trained using SparkML. To train the classification model, we generated feature vectors from the text of tweets (text column). We used a feature engineering pipeline (a.k.a. featurizer) that breaks the text into tokens, splitting on whitespaces and discarding any capitalization and non-alphabetical characters. The pipeline consisted of
See the benchmark notebook (Scala) for more details.
The first set of experiments evaluated data transfer efficiency and ML model inference performance. The chart below shows
Remarks
{:.blog-img-center}
The second set of experiments highlights the computational performance improvement of using the server-side inference approach compared to running inference on the Spark cluster.
{:.blog-img-center}
This work is publicly available under the Apache License 2.0 on GitHub under Microsoft's contributions for Apache Spark with Apache Accumulo.
Feedback, questions, and contributions are welcome!
Thanks to contributions from members on the Azure Global Customer Engineering and Azure Government teams.
Special thanks to Anca Sarb for promptly assisting with MLeap performance issues.