[AIRFLOW-5730] Enable get_pandas_df on PinotDbApiHook (#6399)
diff --git a/airflow/contrib/hooks/pinot_hook.py b/airflow/contrib/hooks/pinot_hook.py
index 084722e..c4edc24 100644
--- a/airflow/contrib/hooks/pinot_hook.py
+++ b/airflow/contrib/hooks/pinot_hook.py
@@ -278,8 +278,5 @@
def set_autocommit(self, conn, autocommit):
raise NotImplementedError()
- def get_pandas_df(self, sql, parameters=None):
- raise NotImplementedError()
-
def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
raise NotImplementedError()
diff --git a/tests/contrib/hooks/test_pinot_hook.py b/tests/contrib/hooks/test_pinot_hook.py
index 6229894..3765f7d 100644
--- a/tests/contrib/hooks/test_pinot_hook.py
+++ b/tests/contrib/hooks/test_pinot_hook.py
@@ -180,6 +180,7 @@
self.conn.conn_type = 'http'
self.conn.extra_dejson = {'endpoint': 'pql'}
self.cur = mock.MagicMock()
+ self.conn.cursor.return_value = self.cur
self.conn.__enter__.return_value = self.cur
self.conn.__exit__.return_value = None
@@ -220,3 +221,14 @@
result_sets = [('row1',), ('row2',)]
self.cur.fetchone.return_value = result_sets[0]
self.assertEqual(result_sets[0], self.db_hook().get_first(statement))
+
+ def test_get_pandas_df(self):
+ statement = 'SQL'
+ column = 'col'
+ result_sets = [('row1',), ('row2',)]
+ self.cur.description = [(column,)]
+ self.cur.fetchall.return_value = result_sets
+ df = self.db_hook().get_pandas_df(statement)
+ self.assertEqual(column, df.columns[0])
+ for i in range(len(result_sets)): # pylint: disable=consider-using-enumerate
+ self.assertEqual(result_sets[i][0], df.values.tolist()[i][0])