| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Flights data preparation" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [ |
| "from pyspark.sql import SQLContext\n", |
| "from pyspark.sql import DataFrame\n", |
| "from pyspark.sql import Row\n", |
| "from pyspark.sql.types import *\n", |
| "import pandas as pd\n", |
| "import StringIO\n", |
| "import matplotlib.pyplot as plt\n", |
| "hc = sc._jsc.hadoopConfiguration()\n", |
| "hc.set(\"hive.execution.engine\", \"mr\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Function to parse CSV" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import csv\n", |
| "\n", |
| "def parseCsv(csvStr):\n", |
| " f = StringIO.StringIO(csvStr)\n", |
| " reader = csv.reader(f, delimiter=',')\n", |
| " row = reader.next()\n", |
| " return row\n", |
| "\n", |
| "scsv = '\"02Q\",\"Titan Airways\"'\n", |
| "row = parseCsv(scsv)\n", |
| "print row[0]\n", |
| "print row[1]\n", |
| "\n", |
| "working_storage = 'WORKING_STORAGE'\n", |
| "output_directory = 'jupyter/py2'\n", |
| "protocol_name = 'PROTOCOL_NAME://'" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Parse and convert Carrier data to parquet" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "carriersHeader = 'Code,Description'\n", |
| "carriersText = sc.textFile(protocol_name + working_storage + \"/jupyter_dataset/carriers.csv\").filter(lambda x: x != carriersHeader)\n", |
| "carriers = carriersText.map(lambda s: parseCsv(s)) \\\n", |
| " .map(lambda s: Row(code=s[0], description=s[1])).cache().toDF()\n", |
| "carriers.write.mode(\"overwrite\").parquet(protocol_name + working_storage + \"/\" + output_directory + \"/carriers\") \n", |
| "sqlContext.registerDataFrameAsTable(carriers, \"carriers\")\n", |
| "carriers.limit(20).toPandas()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Parse and convert to parquet Airport data" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "airportsHeader= '\"iata\",\"airport\",\"city\",\"state\",\"country\",\"lat\",\"long\"'\n", |
| "airports = sc.textFile(protocol_name + working_storage + \"/jupyter_dataset/airports.csv\") \\\n", |
| " .filter(lambda x: x != airportsHeader) \\\n", |
| " .map(lambda s: parseCsv(s)) \\\n", |
| " .map(lambda p: Row(iata=p[0], \\\n", |
| " airport=p[1], \\\n", |
| " city=p[2], \\\n", |
| " state=p[3], \\\n", |
| " country=p[4], \\\n", |
| " lat=float(p[5]), \\\n", |
| " longt=float(p[6])) \\\n", |
| " ).cache().toDF()\n", |
| "airports.write.mode(\"overwrite\").parquet(protocol_name + working_storage + \"/\" + output_directory + \"/airports\") \n", |
| "sqlContext.registerDataFrameAsTable(airports, \"airports\")\n", |
| "airports.limit(20).toPandas()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Parse and convert Flights data to parquet" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "flightsHeader = 'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'\n", |
| "flights = sc.textFile(protocol_name + working_storage + \"/jupyter_dataset/2008.csv.bz2\") \\\n", |
| " .filter(lambda x: x!= flightsHeader) \\\n", |
| " .map(lambda s: parseCsv(s)) \\\n", |
| " .map(lambda p: Row(Year=int(p[0]), \\\n", |
| " Month=int(p[1]), \\\n", |
| " DayofMonth=int(p[2]), \\\n", |
| " DayOfWeek=int(p[3]), \\\n", |
| " DepTime=p[4], \\\n", |
| " CRSDepTime=p[5], \\\n", |
| " ArrTime=p[6], \\\n", |
| " CRSArrTime=p[7], \\\n", |
| " UniqueCarrier=p[8], \\\n", |
| " FlightNum=p[9], \\\n", |
| " TailNum=p[10], \\\n", |
| " ActualElapsedTime=p[11], \\\n", |
| " CRSElapsedTime=p[12], \\\n", |
| " AirTime=p[13], \\\n", |
| " ArrDelay=int(p[14].replace(\"NA\", \"0\")), \\\n", |
| " DepDelay=int(p[15].replace(\"NA\", \"0\")), \\\n", |
| " Origin=p[16], \\\n", |
| " Dest=p[17], \\\n", |
| " Distance=long(p[18]), \\\n", |
| " TaxiIn=p[19], \\\n", |
| " TaxiOut=p[20], \\\n", |
| " Cancelled=p[21], \\\n", |
| " CancellationCode=p[22], \\\n", |
| " Diverted=p[23], \\\n", |
| " CarrierDelay=int(p[24].replace(\"NA\", \"0\")), \\\n", |
| " CarrierDelayStr=p[24], \\\n", |
| " WeatherDelay=int(p[25].replace(\"NA\", \"0\")), \\\n", |
| " WeatherDelayStr=p[25], \\\n", |
| " NASDelay=int(p[26].replace(\"NA\", \"0\")), \\\n", |
| " SecurityDelay=int(p[27].replace(\"NA\", \"0\")), \\\n", |
| " LateAircraftDelay=int(p[28].replace(\"NA\", \"0\")))) \\\n", |
| " .toDF()\n", |
| "\n", |
| "flights.write.mode(\"ignore\").parquet(protocol_name + working_storage + \"/\" + output_directory + \"/flights\")\n", |
| "sqlContext.registerDataFrameAsTable(flights, \"flights\")\n", |
| "flights.limit(10).toPandas()[[\"ArrDelay\",\"CarrierDelay\",\"CarrierDelayStr\",\"WeatherDelay\",\"WeatherDelayStr\",\"Distance\"]]" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "collapsed": true |
| }, |
| "outputs": [], |
| "source": [] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 2", |
| "language": "python", |
| "name": "KERNEL_NAME" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 2 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython2", |
| "version": "2.7.13" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 1 |
| } |