blob: d0360a1dad017fd314e4ceafa8e2edde9446803b [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Flights data preparation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql import SQLContext\n",
"from pyspark.sql import DataFrame\n",
"from pyspark.sql import Row\n",
"from pyspark.sql.types import *\n",
"import pandas as pd\n",
"import StringIO\n",
"import matplotlib.pyplot as plt\n",
"hc = sc._jsc.hadoopConfiguration()\n",
"hc.set(\"hive.execution.engine\", \"mr\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Function to parse CSV"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"\n",
"def parseCsv(csvStr):\n",
" f = StringIO.StringIO(csvStr)\n",
" reader = csv.reader(f, delimiter=',')\n",
" row = reader.next()\n",
" return row\n",
"\n",
"scsv = '\"02Q\",\"Titan Airways\"'\n",
"row = parseCsv(scsv)\n",
"print row[0]\n",
"print row[1]\n",
"\n",
"working_storage = 'WORKING_STORAGE'\n",
"output_directory = 'jupyter/py2'\n",
"protocol_name = 'PROTOCOL_NAME://'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parse and convert Carrier data to parquet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"carriersHeader = 'Code,Description'\n",
"carriersText = sc.textFile(protocol_name + working_storage + \"/jupyter_dataset/carriers.csv\").filter(lambda x: x != carriersHeader)\n",
"carriers = carriersText.map(lambda s: parseCsv(s)) \\\n",
" .map(lambda s: Row(code=s[0], description=s[1])).cache().toDF()\n",
"carriers.write.mode(\"overwrite\").parquet(protocol_name + working_storage + \"/\" + output_directory + \"/carriers\") \n",
"sqlContext.registerDataFrameAsTable(carriers, \"carriers\")\n",
"carriers.limit(20).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parse and convert to parquet Airport data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"airportsHeader= '\"iata\",\"airport\",\"city\",\"state\",\"country\",\"lat\",\"long\"'\n",
"airports = sc.textFile(protocol_name + working_storage + \"/jupyter_dataset/airports.csv\") \\\n",
" .filter(lambda x: x != airportsHeader) \\\n",
" .map(lambda s: parseCsv(s)) \\\n",
" .map(lambda p: Row(iata=p[0], \\\n",
" airport=p[1], \\\n",
" city=p[2], \\\n",
" state=p[3], \\\n",
" country=p[4], \\\n",
" lat=float(p[5]), \\\n",
" longt=float(p[6])) \\\n",
" ).cache().toDF()\n",
"airports.write.mode(\"overwrite\").parquet(protocol_name + working_storage + \"/\" + output_directory + \"/airports\") \n",
"sqlContext.registerDataFrameAsTable(airports, \"airports\")\n",
"airports.limit(20).toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parse and convert Flights data to parquet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"flightsHeader = 'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'\n",
"flights = sc.textFile(protocol_name + working_storage + \"/jupyter_dataset/2008.csv.bz2\") \\\n",
" .filter(lambda x: x!= flightsHeader) \\\n",
" .map(lambda s: parseCsv(s)) \\\n",
" .map(lambda p: Row(Year=int(p[0]), \\\n",
" Month=int(p[1]), \\\n",
" DayofMonth=int(p[2]), \\\n",
" DayOfWeek=int(p[3]), \\\n",
" DepTime=p[4], \\\n",
" CRSDepTime=p[5], \\\n",
" ArrTime=p[6], \\\n",
" CRSArrTime=p[7], \\\n",
" UniqueCarrier=p[8], \\\n",
" FlightNum=p[9], \\\n",
" TailNum=p[10], \\\n",
" ActualElapsedTime=p[11], \\\n",
" CRSElapsedTime=p[12], \\\n",
" AirTime=p[13], \\\n",
" ArrDelay=int(p[14].replace(\"NA\", \"0\")), \\\n",
" DepDelay=int(p[15].replace(\"NA\", \"0\")), \\\n",
" Origin=p[16], \\\n",
" Dest=p[17], \\\n",
" Distance=long(p[18]), \\\n",
" TaxiIn=p[19], \\\n",
" TaxiOut=p[20], \\\n",
" Cancelled=p[21], \\\n",
" CancellationCode=p[22], \\\n",
" Diverted=p[23], \\\n",
" CarrierDelay=int(p[24].replace(\"NA\", \"0\")), \\\n",
" CarrierDelayStr=p[24], \\\n",
" WeatherDelay=int(p[25].replace(\"NA\", \"0\")), \\\n",
" WeatherDelayStr=p[25], \\\n",
" NASDelay=int(p[26].replace(\"NA\", \"0\")), \\\n",
" SecurityDelay=int(p[27].replace(\"NA\", \"0\")), \\\n",
" LateAircraftDelay=int(p[28].replace(\"NA\", \"0\")))) \\\n",
" .toDF()\n",
"\n",
"flights.write.mode(\"ignore\").parquet(protocol_name + working_storage + \"/\" + output_directory + \"/flights\")\n",
"sqlContext.registerDataFrameAsTable(flights, \"flights\")\n",
"flights.limit(10).toPandas()[[\"ArrDelay\",\"CarrierDelay\",\"CarrierDelayStr\",\"WeatherDelay\",\"WeatherDelayStr\",\"Distance\"]]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "KERNEL_NAME"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 1
}