[AIRFLOW-5730] Enable get_pandas_df on PinotDbApiHook (#6399)

diff --git a/airflow/contrib/hooks/pinot_hook.py b/airflow/contrib/hooks/pinot_hook.py
index 084722e..c4edc24 100644
--- a/airflow/contrib/hooks/pinot_hook.py
+++ b/airflow/contrib/hooks/pinot_hook.py
@@ -278,8 +278,5 @@
     def set_autocommit(self, conn, autocommit):
         raise NotImplementedError()
 
-    def get_pandas_df(self, sql, parameters=None):
-        raise NotImplementedError()
-
     def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
         raise NotImplementedError()
diff --git a/tests/contrib/hooks/test_pinot_hook.py b/tests/contrib/hooks/test_pinot_hook.py
index 6229894..3765f7d 100644
--- a/tests/contrib/hooks/test_pinot_hook.py
+++ b/tests/contrib/hooks/test_pinot_hook.py
@@ -180,6 +180,7 @@
         self.conn.conn_type = 'http'
         self.conn.extra_dejson = {'endpoint': 'pql'}
         self.cur = mock.MagicMock()
+        self.conn.cursor.return_value = self.cur
         self.conn.__enter__.return_value = self.cur
         self.conn.__exit__.return_value = None
 
@@ -220,3 +221,14 @@
         result_sets = [('row1',), ('row2',)]
         self.cur.fetchone.return_value = result_sets[0]
         self.assertEqual(result_sets[0], self.db_hook().get_first(statement))
+
+    def test_get_pandas_df(self):
+        statement = 'SQL'
+        column = 'col'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.description = [(column,)]
+        self.cur.fetchall.return_value = result_sets
+        df = self.db_hook().get_pandas_df(statement)
+        self.assertEqual(column, df.columns[0])
+        for i in range(len(result_sets)):  # pylint: disable=consider-using-enumerate
+            self.assertEqual(result_sets[i][0], df.values.tolist()[i][0])