bigquery support
diff --git a/python-toolbox/marvin_python_toolbox/management/__init__.py b/python-toolbox/marvin_python_toolbox/management/__init__.py
index 8e3cb04..3b80667 100644
--- a/python-toolbox/marvin_python_toolbox/management/__init__.py
+++ b/python-toolbox/marvin_python_toolbox/management/__init__.py
@@ -28,6 +28,7 @@
from .notebook import cli as cli_notebook
from .hive import cli as cli_hive
from .engine import cli as cli_engine
+from .bigquery import cli as cli_bigquery
from ..config import parse_ini
from ..loader import load_commands_from_file
@@ -38,7 +39,7 @@
logger = get_logger('management')
-TOOL_EXCLUDE = ['engine-server', 'engine-dryrun', 'engine-httpserver', 'engine-grpcserver', 'engine-deploy', 'engine-httpserver-remote', 'pkg-showversion']
+TOOL_EXCLUDE = ['engine-server', 'engine-dryrun', 'engine-httpserver', 'engine-grpcserver', 'engine-deploy', 'engine-httpserver-remote', 'pkg-showversion', 'bigquery-dataimport']
PROD_EXCLUDE = ['test', 'test-tdd', 'test-tox', 'test-checkpep8', 'lab', 'notebook', 'pkg-bumpversion', 'pkg-createtag', 'pkg-showchanges', 'pkg-showinfo', 'pkg-updatedeps']
EXCLUDE_BY_TYPE = {
@@ -108,6 +109,7 @@
# Load internal commands
commands = {}
+ commands.update(cli_bigquery.commands)
commands.update(cli_pkg.commands)
commands.update(cli_test.commands)
commands.update(cli_notebook.commands)
diff --git a/python-toolbox/marvin_python_toolbox/management/bigquery.py b/python-toolbox/marvin_python_toolbox/management/bigquery.py
new file mode 100644
index 0000000..b18a7b8
--- /dev/null
+++ b/python-toolbox/marvin_python_toolbox/management/bigquery.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+# coding=utf-8
+
+# Copyright [2019] [Apache Software Foundation]
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+import click
+import time
+import os
+import json
+import pandas as pd
+from google.cloud import bigquery
+import hashlib
+
+from .._logging import get_logger
+
+from .._compatibility import six
+
+
+logger = get_logger('management.bigquery')
+
+
+@click.group('bigquery')
+def cli():
+ pass
+
+def read_file(filename):
+ fname = os.path.join("", filename)
+ if os.path.exists(fname):
+
+ print("Engine file {} loaded!".format(filename))
+
+ with open(fname, 'r') as fp:
+ return json.load(fp)
+ else:
+ print("Engine file {} doesn't exists...".format(filename))
+ return {}
+
+@cli.command(
+ 'bigquery-dataimport',
+ help='Import data samples from a BigQuery database.')
+@click.option('--dryrun', '-d', is_flag=True, help='If it must run just to estimate costs')
+@click.option('--max_billed', '-m', default=10, help='Max bytes to be billed in the queries')
+@click.option('--metadata-file', '-mf', default='engine.metadata', help='Marvin engine metadata file path', type=click.Path(exists=True))
+@click.pass_context
+def bigquery_dataimport_cli(ctx, dryrun, max_billed, metadata_file):
+ bigquery_dataimport(ctx, metadata_file, dryrun, max_billed)
+
+def bigquery_dataimport(ctx, metadata_file, dryrun, max_billed):
+
+ initial_start_time = time.time()
+
+ metadata = read_file(metadata_file)
+
+ if metadata:
+ print(chr(27) + "[2J")
+
+ data_path = os.environ['MARVIN_DATA_PATH']
+ path_csvs = data_path + '/bigquery-' + metadata['bigquery_project']
+
+ if not dryrun:
+ os.mkdir(path_csvs)
+
+ for query, file in zip(metadata['bigquery_queries'], metadata['bigquery_csvfiles']):
+
+ print("project: {} query: {} file: {}".format(metadata['bigquery_project'], query, file))
+
+ bdi = BigQueryImporter(
+ project=metadata['bigquery_project'],
+ sql=query,
+ file=file,
+ # transform max_billed in bytes
+ max_billed=max_billed * 1073741824,
+ path_csv = path_csvs
+
+ )
+ if dryrun:
+ bdi.dryrun()
+ else:
+ bdi.query()
+
+ print("Total Time : {:.2f}s".format(time.time() - initial_start_time))
+
+ print("\n")
+
+
+def read_config(filename):
+ fname = os.path.join("", filename)
+ if os.path.exists(fname):
+ with open(fname, 'r') as fp:
+ return json.load(fp)[0]
+ else:
+ print("Configuration file {} doesn't exists...".format(filename))
+ return {}
+
+
+class BigQueryImporter():
+ def __init__(self, project, sql, file, max_billed, path_csv):
+ self.project = project
+ self.sql = sql
+ self.file = file
+ self.max_billed = max_billed
+ self.path_csv = path_csv
+
+ def query(self):
+ job_config = bigquery.QueryJobConfig()
+ job_config.use_query_cache = False
+ job_config.maximum_bytes_billed=self.max_billed
+ client = bigquery.Client(project=self.project)
+ query_job = client.query(self.sql,
+ job_config=job_config)
+ dataframe = query_job.to_dataframe()
+ dataframe.to_csv(self.path_csv + '/' + self.file, index=False)
+
+ def dryrun(self):
+ job_config = bigquery.QueryJobConfig()
+ job_config.use_query_cache = False
+ job_config.dry_run = True
+ job_config.maximum_bytes_billed=self.max_billed
+ client = bigquery.Client(project=self.project)
+ query_job = client.query(self.sql,
+ job_config=job_config)
+
+ assert query_job.state == "DONE"
+ assert query_job.dry_run
+
+ print("The query: {}\nWill process {} Gb.\n".format(self.sql, query_job.total_bytes_processed / 1073741824))
diff --git a/python-toolbox/marvin_python_toolbox/management/templates/python-engine/engine.metadata b/python-toolbox/marvin_python_toolbox/management/templates/python-engine/engine.metadata
index 88ecc2f..bb56837 100644
--- a/python-toolbox/marvin_python_toolbox/management/templates/python-engine/engine.metadata
+++ b/python-toolbox/marvin_python_toolbox/management/templates/python-engine/engine.metadata
@@ -58,5 +58,8 @@
"artifactsToPersist": [],
"artifactsToLoad": [],
"pipeline": []
- }]
+ }],
+ "bigquery_project": "xxx-project",
+ "bigquery_queries": [],
+ "bigquery_csvfiles": []
}
diff --git a/python-toolbox/setup.py b/python-toolbox/setup.py
index cec9ce1..3cafd6f 100644
--- a/python-toolbox/setup.py
+++ b/python-toolbox/setup.py
@@ -82,6 +82,7 @@
'idna>=2.5',
'bleach>=1.5.0',
'numpy>=1.16.2',
+ 'google-cloud-bigquery>=1.21',
]
# Test dependencies