The anomaly detection example demonstrates how to set up an anomaly detection pipeline that reads text from Pub/Sub in real time, and then detects anomalies using a trained HDBSCAN clustering model.
This example uses a dataset called emotion that contains 20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation, and test (for performance evaluation). Because it contains the text and the category (class) of the dataset, it is a supervised dataset. You can use the Hugging Face datasets page to access this dataset.
The following text shows examples from the train split of the dataset:
| Text | Type of emotion |
|---|---|
| im grabbing a minute to post i feel greedy wrong | Anger |
| i am ever feeling nostalgic about the fireplace i will know that it is still on the property | Love |
| ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
| on a boat trip to denmark | Joy |
| i feel you know basically like a fake in the realm of science fiction | Sadness |
| i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
HDBSCAN is a clustering algorithm that extends DBSCAN by converting it into a hierarchical clustering algorithm and then extracting a flat clustering based in the stability of clusters. When trained, the model predicts -1 if a new data point is an outlier, otherwise it predicts one of the existing clusters.
Ingest the data into Pub/Sub so that while clustering, the model can read the tweets from Pub/Sub. Pub/Sub is a messaging service for exchanging event data among applications and services. Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data.
You can see the full example code for ingesting data into Pub/Sub in GitHub
The file structure for the ingestion pipeline is shown in the following diagram:
write_data_to_pubsub_pipeline/ ├── pipeline/ │ ├── __init__.py │ ├── options.py │ └── utils.py ├── __init__.py ├── config.py ├── main.py └── setup.py
pipeline/utils.py contains the code for loading the emotion dataset and two beam.DoFn that are used for data transformation.
pipeline/options.py contains the pipeline options to configure the Dataflow pipeline.
config.py defines variables that are used multiple times, like Google Cloud PROJECT_ID and NUM_WORKERS.
setup.py defines the packages and requirements for the pipeline to run.
main.py contains the pipeline code and additional functions used for running the pipeline.
To run the pipeline, install the required packages.For this example, you need access to a Google Cloud project, and you need to configure the Google Cloud variables, like PROJECT_ID, REGION, PubSub TOPIC_ID, and others in the config.py file.
python main.pypython main.py --mode cloudThe write_data_to_pubsub_pipeline contains four different transforms:
After ingesting the data to Pub/Sub, run the anomaly detection pipeline. This pipeline reads the streaming message from Pub/Sub, converts the text to an embedding using a language model, and feeds the embedding to an already trained clustering model to predict whether the message is an anomaly. One prerequisite for this pipeline is to have an HDBSCAN clustering model trained on the training split of the dataset.
You can find the full example code for anomaly detection in GitHub
The following diagram shows the file structure for the anomaly_detection pipeline:
anomaly_detection_pipeline/ ├── pipeline/ │ ├── __init__.py │ ├── options.py │ └── transformations.py ├── __init__.py ├── config.py ├── main.py └── setup.py
pipeline/transformations.py contains the code for different beam.DoFn and additional functions that are used in pipeline.
pipeline/options.py contains the pipeline options to configure the Dataflow pipeline.
config.py defines variables that are used multiple times, like the Google Cloud PROJECT_ID and NUM_WORKERS.
setup.py defines the packages and requirements for the pipeline to run.
main.py contains the pipeline code and additional functions used to run the pipeline.
Install the required packages and push the data to Pub/Sub. For this example, you need access to a Google Cloud project, and you need to configure the Google Cloud variables, like PROJECT_ID, REGION, PubSub SUBSCRIPTION_ID, and others in the config.py file.
python main.pypython main.py --mode cloudThe pipeline includes the following steps:
PCollection of dictionaries where the key is the UID and the value is the Twitter text.The following code snippet shows the first two steps of the pipeline:
{{< highlight >}} docs = ( pipeline | “Read from PubSub” >> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True) | “Decode PubSubMessage” >> beam.ParDo(Decode()) ) {{< /highlight >}}
The next section describes the following pipeline steps:
In order to do clustering with text data, first map the text into vectors of numerical values suitable for statistical analysis. This example uses a transformer-based language model called sentence-transformers/stsb-distilbert-base/stsb-distilbert-base. This model maps sentences and paragraphs to a 768 dimensional dense vector space, and you can use it for tasks like clustering or semantic search.
Because the language model is expecting a tokenized input instead of raw text, start by tokenizing the text. Tokenization is a preprocessing task that transforms text so that it can be fed into the model for getting predictions.
{{< highlight >}} normalized_embedding = ( docs | “Tokenize Text” >> beam.Map(tokenize_sentence) {{< /highlight >}}
Here, tokenize_sentence is a function that takes a dictionary with a text and an ID, tokenizes the text, and returns a tuple of the text and ID as well as the tokenized output.
{{< highlight file=“sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/pipeline/transformations.py” >}} {{< code_sample “sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/pipeline/transformations.py” tokenization >}} {{< /highlight >}}
Tokenized output is then passed to the language model to get the embeddings. To get embeddings from the language model, we use RunInference() from Apache Beam.
{{< highlight >}} | “Get Embedding” >> RunInference(KeyedModelHandler(embedding_model_handler))
{{< /highlight >}} where embedding_model_handler is:
{{< highlight >}} embedding_model_handler = PytorchNoBatchModelHandler( state_dict_path=cfg.MODEL_STATE_DICT_PATH, model_class=ModelWrapper, model_params={“config”: AutoConfig.from_pretrained(cfg.MODEL_CONFIG_PATH)}, device=“cpu”, ) {{< /highlight >}}
We define PytorchNoBatchModelHandler as a wrapper to PytorchModelHandler to limit batch size to one.
{{< highlight file=“sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/main.py” >}} {{< code_sample “sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/main.py” PytorchNoBatchModelHandler >}} {{< /highlight >}}
Because the forward() for DistilBertModel doesn't return the embeddings, we custom define the model_class ModelWrapper to get the vector embedding.
{{< highlight file=“sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/pipeline/transformations.py” >}} {{< code_sample “sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/pipeline/transformations.py” DistilBertModelWrapper >}} {{< /highlight >}}
After getting the embedding for each piece of Twitter text, the embeddings are normalized, because the trained model is expecting normalized embeddings.
{{< highlight >}} | “Normalize Embedding” >> beam.ParDo(NormalizeEmbedding())
{{< /highlight >}}
The normalized embeddings are then forwarded to the trained HDBSCAN model to get the predictions.
{{< highlight >}} predictions = ( normalized_embedding | “Get Prediction from Clustering Model” >> RunInference(model_handler=clustering_model_handler) ) {{< /highlight >}}
where clustering_model_handler is:
{{< highlight >}} clustering_model_handler = KeyedModelHandler( CustomSklearnModelHandlerNumpy( model_uri=cfg.CLUSTERING_MODEL_PATH, model_file_type=ModelFileType.JOBLIB ) ) {{< /highlight >}}
We define CustomSklearnModelHandlerNumpy as a wrapper to SklearnModelHandlerNumpy to limit batch size to one and to override run_inference so that hdbscan.approximate_predict() is used to get anomaly predictions.
{{< highlight file=“sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/pipeline/transformations.py” >}} {{< code_sample “sdks/python/apache_beam/examples/inference/anomaly_detection/anomaly_detection_pipeline/pipeline/transformations.py” CustomSklearnModelHandlerNumpy >}} {{< /highlight >}}
After getting the model predictions, decode the output from RunInference into a dictionary. Next, store the prediction in a BigQuery table for analysis, update the HDBSCAN model, and send an email alert if the prediction is an anomaly.
{{< highlight >}} _ = ( predictions | “Decode Prediction” >> beam.ParDo(DecodePrediction()) | “Write to BQ” >> beam.io.WriteToBigQuery( table=cfg.TABLE_URI, schema=cfg.TABLE_SCHEMA, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, ))
_ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())
{{< /highlight >}}