blob: af3c0dc4b947b2ef79d1b522c01661a1980e7110 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Extracting Data from the StreamPipes data lake\n",
"\n",
"In the first tutorial ([Introduction to the StreamPipes Python client](../1-introduction-to-streampipes-python-client)) we took the first steps with the StreamPipes Python client and learned how to set everything up.\n",
"Now we are ready to get started and want to retrieve some data out of StreamPipes.\n",
"In this tutorial, we'll focus on the StreamPipes Data Lake, the component where StreamPipes stores data internally.\n",
"To get started, we'll use the `client` instance created in the first tutorial."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 1,
"outputs": [],
"source": [
"from streampipes.client import StreamPipesClient\n",
"from streampipes.client.config import StreamPipesClientConfig\n",
"from streampipes.client.credential_provider import StreamPipesApiKeyCredentials"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"# if you want all necessary dependencies required for this tutorial to be installed,\n",
"# you can simply execute the following command\n",
"%pip install matplotlib streampipes"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 2,
"outputs": [],
"source": [
"import os\n",
"os.environ[\"SP_USERNAME\"] = \"admin@streampipes.apache.org\"\n",
"os.environ[\"SP_API_KEY\"] = \"XXX\"\n"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 3,
"outputs": [],
"source": [
"config = StreamPipesClientConfig(\n",
" credential_provider=StreamPipesApiKeyCredentials(),\n",
" host_address=\"localhost\",\n",
" https_disabled=True,\n",
" port=80\n",
")"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 4,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023-02-24 17:34:25,860 - streampipes.client.client - [INFO] - [client.py:128] [_set_up_logging] - Logging successfully initialized with logging level INFO.\n"
]
}
],
"source": [
"client = StreamPipesClient(client_config=config)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"As a first step, we want to get an overview about all data available in the data lake.\n",
"The data is stored as so-called `measures`, which refer to a data stream stored in the data lake.\n",
"For his purpose we use the `all()` method of the `dataLakeMeasure` endpoint."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 5,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023-02-24 17:34:25,929 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:167] [_make_request] - Successfully retrieved all resources.\n"
]
}
],
"source": [
"data_lake_measures = client.dataLakeMeasureApi.all()"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"So let's see how many measures are available:"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 6,
"outputs": [
{
"data": {
"text/plain": "2"
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(data_lake_measures)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"All resources of the StreamPipes Python client support the standard Python expressions. If not, [please let us know](https://github.com/apache/streampipes/issues/new/choose)."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 7,
"outputs": [
{
"data": {
"text/plain": "DataLakeMeasure(element_id='3cb6b5e6f107452483d1fd2ccf4bf9f9', measure_name='test', timestamp_field='s0::timestamp', event_schema=EventSchema(event_properties=[EventProperty(class_name='org.apache.streampipes.model.schema.EventPropertyPrimitive', element_id='sp:eventproperty:EiFnkL', label='Density', description='Denotes the current density of the fluid', runtime_name='density', required=False, domain_properties=['http://schema.org/Number'], property_scope='MEASUREMENT_PROPERTY', index=5, runtime_id=None, runtime_type='http://www.w3.org/2001/XMLSchema#float', measurement_unit=None, value_specification=None), EventProperty(class_name='org.apache.streampipes.model.schema.EventPropertyPrimitive', element_id='sp:eventproperty:ghSkQI', label='Mass Flow', description='Denotes the current mass flow in the sensor', runtime_name='mass_flow', required=False, domain_properties=['http://schema.org/Number'], property_scope='MEASUREMENT_PROPERTY', index=2, runtime_id=None, runtime_type='http://www.w3.org/2001/XMLSchema#float', measurement_unit=None, value_specification=None), EventProperty(class_name='org.apache.streampipes.model.schema.EventPropertyPrimitive', element_id='sp:eventproperty:cQAUry', label='Sensor ID', description='The ID of the sensor', runtime_name='sensorId', required=False, domain_properties=['https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId'], property_scope='DIMENSION_PROPERTY', index=1, runtime_id=None, runtime_type='http://www.w3.org/2001/XMLSchema#string', measurement_unit=None, value_specification=None), EventProperty(class_name='org.apache.streampipes.model.schema.EventPropertyPrimitive', element_id='sp:eventproperty:pbPMyL', label='Sensor Fault Flags', description='Any fault flags of the sensors', runtime_name='sensor_fault_flags', required=False, domain_properties=['http://schema.org/Boolean'], property_scope='MEASUREMENT_PROPERTY', index=6, runtime_id=None, runtime_type='http://www.w3.org/2001/XMLSchema#boolean', measurement_unit=None, value_specification=None), EventProperty(class_name='org.apache.streampipes.model.schema.EventPropertyPrimitive', element_id='sp:eventproperty:Qmayhw', label='Temperature', description='Denotes the current temperature in degrees celsius', runtime_name='temperature', required=False, domain_properties=['http://schema.org/Number'], property_scope='MEASUREMENT_PROPERTY', index=4, runtime_id=None, runtime_type='http://www.w3.org/2001/XMLSchema#float', measurement_unit='http://qudt.org/vocab/unit#DegreeCelsius', value_specification=ValueSpecification(class_name='org.apache.streampipes.model.schema.QuantitativeValue', element_id=None, min_value=0, max_value=100, step=0.1)), EventProperty(class_name='org.apache.streampipes.model.schema.EventPropertyPrimitive', element_id='sp:eventproperty:YQYhjd', label='Volume Flow', description='Denotes the current volume flow', runtime_name='volume_flow', required=False, domain_properties=['http://schema.org/Number'], property_scope='MEASUREMENT_PROPERTY', index=3, runtime_id=None, runtime_type='http://www.w3.org/2001/XMLSchema#float', measurement_unit=None, value_specification=None)]), pipeline_id=None, pipeline_name=None, pipeline_is_running=False, schema_version='1.1')"
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data_lake_measures[-1]"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"To get a more comprehensive overview, you can take a look at the [`pandas`](https://pandas.pydata.org/) representation:"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 8,
"outputs": [
{
"data": {
"text/plain": " measure_name timestamp_field pipeline_id pipeline_name pipeline_is_running \\\n0 flow-rate s0::timestamp None None False \n1 test s0::timestamp None None False \n\n num_event_properties \n0 6 \n1 6 ",
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>measure_name</th>\n <th>timestamp_field</th>\n <th>pipeline_id</th>\n <th>pipeline_name</th>\n <th>pipeline_is_running</th>\n <th>num_event_properties</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>flow-rate</td>\n <td>s0::timestamp</td>\n <td>None</td>\n <td>None</td>\n <td>False</td>\n <td>6</td>\n </tr>\n <tr>\n <th>1</th>\n <td>test</td>\n <td>s0::timestamp</td>\n <td>None</td>\n <td>None</td>\n <td>False</td>\n <td>6</td>\n </tr>\n </tbody>\n</table>\n</div>"
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"display(data_lake_measures.to_pandas())"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"So far, we have only retrieved metadata about the available data lake measure.\n",
"In the following, we will access the actual data of the measure `flow-rate`.\n",
"\n",
"For this purpose, we will use the `get()` method of the `dataLakeMeasure` endpoint."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 9,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023-02-24 17:34:26,020 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:167] [_make_request] - Successfully retrieved all resources.\n"
]
}
],
"source": [
"flow_rate_measure = client.dataLakeMeasureApi.get(identifier=\"flow-rate\")"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"For further processing, the easiest way is to turn the data measure into a `pandas DataFrame`."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 10,
"outputs": [],
"source": [
"flow_rate_pd = flow_rate_measure.to_pandas()"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"Let's see how many data points we got..."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 11,
"outputs": [
{
"data": {
"text/plain": "1000"
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(flow_rate_pd)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"... and get a first overview"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 12,
"outputs": [
{
"data": {
"text/plain": " density mass_flow temperature volume_flow\ncount 1000.000000 1000.000000 1000.000000 1000.000000\nmean 45.560337 5.457014 45.480231 5.659558\nstd 3.201544 3.184959 3.132878 3.122437\nmin 40.007698 0.004867 40.000992 0.039422\n25% 42.819497 2.654101 42.754623 3.021625\n50% 45.679264 5.382355 45.435944 5.572553\n75% 48.206881 8.183144 48.248473 8.338209\nmax 50.998310 10.986015 50.964909 10.998676",
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>density</th>\n <th>mass_flow</th>\n <th>temperature</th>\n <th>volume_flow</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>count</th>\n <td>1000.000000</td>\n <td>1000.000000</td>\n <td>1000.000000</td>\n <td>1000.000000</td>\n </tr>\n <tr>\n <th>mean</th>\n <td>45.560337</td>\n <td>5.457014</td>\n <td>45.480231</td>\n <td>5.659558</td>\n </tr>\n <tr>\n <th>std</th>\n <td>3.201544</td>\n <td>3.184959</td>\n <td>3.132878</td>\n <td>3.122437</td>\n </tr>\n <tr>\n <th>min</th>\n <td>40.007698</td>\n <td>0.004867</td>\n <td>40.000992</td>\n <td>0.039422</td>\n </tr>\n <tr>\n <th>25%</th>\n <td>42.819497</td>\n <td>2.654101</td>\n <td>42.754623</td>\n <td>3.021625</td>\n </tr>\n <tr>\n <th>50%</th>\n <td>45.679264</td>\n <td>5.382355</td>\n <td>45.435944</td>\n <td>5.572553</td>\n </tr>\n <tr>\n <th>75%</th>\n <td>48.206881</td>\n <td>8.183144</td>\n <td>48.248473</td>\n <td>8.338209</td>\n </tr>\n <tr>\n <th>max</th>\n <td>50.998310</td>\n <td>10.986015</td>\n <td>50.964909</td>\n <td>10.998676</td>\n </tr>\n </tbody>\n</table>\n</div>"
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flow_rate_pd.describe()"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"As a final step, we want to create a plot of both attributes."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 13,
"outputs": [
{
"data": {
"text/plain": "<Figure size 640x480 with 1 Axes>",
"image/png": "\n"
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import matplotlib.pyplot as plt\n",
"flow_rate_pd.plot(y=[\"mass_flow\", \"temperature\"])\n",
"plt.show()"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"For data lake measurements, the `get()` method is even more powerful than simply returning all the data for a given data lake measurement. We will look at a selection of these below. The full list of supported parameters can be found in the [docs](../../reference/endpoint/api/data_lake_measure/#streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig). <br>\n",
"Let's start by referring to the graph we created above, where we use only two columns of our data lake measurement. If we already know this, we can directly restrict the queried data to a subset of columns by using the `columns` parameter. <br>\n",
"`columns` takes a list of column names as a comma-separated string:"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 14,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023-02-24 17:34:26,492 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:167] [_make_request] - Successfully retrieved all resources.\n"
]
},
{
"data": {
"text/plain": " timestamp mass_flow temperature\n0 2023-02-24T16:19:41.472Z 3.309556 44.448483\n1 2023-02-24T16:19:41.482Z 5.608580 40.322033\n2 2023-02-24T16:19:41.493Z 7.692881 49.239639\n3 2023-02-24T16:19:41.503Z 3.632898 49.933754\n4 2023-02-24T16:19:41.513Z 0.711260 50.106617\n.. ... ... ...\n995 2023-02-24T16:19:52.927Z 1.740114 46.558231\n996 2023-02-24T16:19:52.94Z 7.211723 48.048622\n997 2023-02-24T16:19:52.952Z 7.770180 48.188026\n998 2023-02-24T16:19:52.965Z 4.458602 48.280899\n999 2023-02-24T16:19:52.977Z 2.592060 47.505951\n\n[1000 rows x 3 columns]",
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>timestamp</th>\n <th>mass_flow</th>\n <th>temperature</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>2023-02-24T16:19:41.472Z</td>\n <td>3.309556</td>\n <td>44.448483</td>\n </tr>\n <tr>\n <th>1</th>\n <td>2023-02-24T16:19:41.482Z</td>\n <td>5.608580</td>\n <td>40.322033</td>\n </tr>\n <tr>\n <th>2</th>\n <td>2023-02-24T16:19:41.493Z</td>\n <td>7.692881</td>\n <td>49.239639</td>\n </tr>\n <tr>\n <th>3</th>\n <td>2023-02-24T16:19:41.503Z</td>\n <td>3.632898</td>\n <td>49.933754</td>\n </tr>\n <tr>\n <th>4</th>\n <td>2023-02-24T16:19:41.513Z</td>\n <td>0.711260</td>\n <td>50.106617</td>\n </tr>\n <tr>\n <th>...</th>\n <td>...</td>\n <td>...</td>\n <td>...</td>\n </tr>\n <tr>\n <th>995</th>\n <td>2023-02-24T16:19:52.927Z</td>\n <td>1.740114</td>\n <td>46.558231</td>\n </tr>\n <tr>\n <th>996</th>\n <td>2023-02-24T16:19:52.94Z</td>\n <td>7.211723</td>\n <td>48.048622</td>\n </tr>\n <tr>\n <th>997</th>\n <td>2023-02-24T16:19:52.952Z</td>\n <td>7.770180</td>\n <td>48.188026</td>\n </tr>\n <tr>\n <th>998</th>\n <td>2023-02-24T16:19:52.965Z</td>\n <td>4.458602</td>\n <td>48.280899</td>\n </tr>\n <tr>\n <th>999</th>\n <td>2023-02-24T16:19:52.977Z</td>\n <td>2.592060</td>\n <td>47.505951</td>\n </tr>\n </tbody>\n</table>\n<p>1000 rows × 3 columns</p>\n</div>"
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flow_rate_pd = client.dataLakeMeasureApi.get(identifier=\"flow-rate\", columns=\"mass_flow,temperature\").to_pandas()\n",
"flow_rate_pd"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"By default, the client returns only the first one thousand records of a Data Lake measurement. This can be changed by passing a concrete value for the `limit` parameter:"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 15,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023-02-24 17:34:26,736 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:167] [_make_request] - Successfully retrieved all resources.\n"
]
},
{
"data": {
"text/plain": "9528"
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flow_rate_pd = client.dataLakeMeasureApi.get(identifier=\"flow-rate\", limit=10000).to_pandas()\n",
"len(flow_rate_pd)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"If you want your data to be selected by time of occurrence rather than quantity, you can specify your time window by passing the `start_date` and `end_date` parameters:"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": 16,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2023-02-24 17:34:26,899 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:167] [_make_request] - Successfully retrieved all resources.\n"
]
},
{
"data": {
"text/plain": "<Figure size 640x480 with 1 Axes>",
"image/png": "\n"
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from datetime import datetime\n",
"flow_rate_pd = client.dataLakeMeasureApi.get(\n",
" identifier=\"flow-rate\",\n",
" start_date=datetime(year=2023, month=2, day=24, hour=17, minute=21, second=0),\n",
" end_date=datetime(year=2023, month=2, day=24, hour=17, minute=21, second=1),\n",
" ).to_pandas()\n",
"flow_rate_pd.plot(y=[\"mass_flow\", \"temperature\"])\n",
"plt.show()"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"... from this point on, we leave all future processing of the data up to your creativity.\n",
"Keep in mind: the general syntax used in this tutorial (`all()`, `to_pandas()`, `get()`) applies to all endpoints and associated resources of the StreamPipes Python client.\n",
"\n",
"If you get further and create exciting stuff with data extracted from StreamPipes please [let us know](https://github.com/apache/streampipes/discussions/categories/show-and-tell).\n",
"We are thrilled to see what you as a community will build with the provided client.\n",
"Furthermore, don't hesitate to discuss [feature requests](https://github.com/apache/streampipes/discussions/812) to extend the current functionality with us."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"For now, that's all about the StreamPipes client. Read the next tutorial ([Getting live data from the StreamPipes data stream](../3-getting-live-data-from-the-streampipes-data-stream)) if you are interested in making use of the powerful [StreamPipes functions](https://streampipes.apache.org/docs/extend-sdk-functions.html) to interact with StreamPipes in an event-based manner."
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"How do you like this tutorial?\n",
"We hope you like it and would love to receive some feedback from you.\n",
"Just go to our [GitHub discussion page](https://github.com/apache/streampipes/discussions) and let us know your impression.\n",
"We'll read and react to them all, we promise!"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [],
"metadata": {
"collapsed": false
}
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
},
"license": "https://www.apache.org/licenses/LICENSE-2.0"
},
"nbformat": 4,
"nbformat_minor": 0
}