blob: 877d1100825162d3691bab18acf809df5469532f [file] [log] [blame]
{"nbformat_minor": 0, "cells": [{"source": "# Flight Delay Prediction Demo Using SystemML", "cell_type": "markdown", "metadata": {}}, {"source": "This notebook is based on datascientistworkbench.com's tutorial notebook for predicting flight delay.", "cell_type": "markdown", "metadata": {}}, {"source": "## Loading SystemML ", "cell_type": "markdown", "metadata": {}}, {"source": "To use one of the released version, use \"%AddDeps org.apache.systemml systemml 0.9.0-incubating\". To use nightly build, \"%AddJar https://sparktc.ibmcloud.com/repo/latest/SystemML.jar\"\n\nOr you provide SystemML.jar and dependency through commandline when starting the notebook (for example: --packages com.databricks:spark-csv_2.10:1.4.0 --jars SystemML.jar)", "cell_type": "markdown", "metadata": {}}, {"execution_count": 1, "cell_type": "code", "source": "%AddJar https://sparktc.ibmcloud.com/repo/latest/SystemML.jar", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Using cached version of SystemML.jar\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Use Spark's CSV package for loading the CSV file", "cell_type": "markdown", "metadata": {}}, {"execution_count": 2, "cell_type": "code", "source": "%AddDeps com.databricks spark-csv_2.10 1.4.0", "outputs": [{"output_type": "stream", "name": "stdout", "text": ":: loading settings :: url = jar:file:/usr/local/spark-kernel/lib/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n:: resolving dependencies :: com.ibm.spark#spark-kernel;working [not transitive]\n\tconfs: [default]\n\tfound com.databricks#spark-csv_2.10;1.4.0 in central\n:: resolution report :: resolve 98ms :: artifacts dl 5ms\n\t:: modules in use:\n\tcom.databricks#spark-csv_2.10;1.4.0 from central in [default]\n\t---------------------------------------------------------------------\n\t| | modules || artifacts |\n\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n\t---------------------------------------------------------------------\n\t| default | 1 | 0 | 0 | 0 || 1 | 0 |\n\t---------------------------------------------------------------------\n:: retrieving :: com.ibm.spark#spark-kernel\n\tconfs: [default]\n\t0 artifacts copied, 1 already retrieved (0kB/10ms)\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "## Import Data", "cell_type": "markdown", "metadata": {"collapsed": true}}, {"source": "Download the airline dataset from stat-computing.org if not already downloaded", "cell_type": "markdown", "metadata": {}}, {"execution_count": 3, "cell_type": "code", "source": "import sys.process._\nimport java.net.URL\nimport java.io.File\nval url = \"http://stat-computing.org/dataexpo/2009/2007.csv.bz2\"\nval localFilePath = \"airline2007.csv.bz2\"\nif(!new java.io.File(localFilePath).exists) {\n new URL(url) #> new File(localFilePath) !!\n}", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Load the dataset into DataFrame using Spark CSV package", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "import org.apache.spark.sql.SQLContext\nval sqlContext = new SQLContext(sc)\nval fmt = sqlContext.read.format(\"com.databricks.spark.csv\")\nval opt = fmt.options(Map(\"header\"->\"true\", \"inferSchema\"->\"true\"))\nval airline = opt.load(localFilePath).na.replace( \"*\", Map(\"NA\" -> \"0.0\") )", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "airline.printSchema", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "## Data Exploration\nWhich airports have the most delays?", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "airline.registerTempTable(\"airline\")\nsqlContext.sql(\"\"\"SELECT Origin, count(*) conFlight, avg(DepDelay) delay\n FROM airline\n GROUP BY Origin\n ORDER BY delay DESC\"\"\").show", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "## Modeling: Logistic Regression\n\nPredict departure delays of greater than 15 of flights from JFK", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "sqlContext.udf.register(\"checkDelay\", (depDelay:String) => try { if(depDelay.toDouble > 15) 1.0 else 2.0 } catch { case e:Exception => 1.0 })\nval smallAirlineData = sqlContext.sql(\"SELECT *, checkDelay(DepDelay) label FROM airline WHERE Origin = 'JFK'\")\nval datasets = smallAirlineData.randomSplit(Array(0.7, 0.3))\nval trainDataset = datasets(0).cache\nval testDataset = datasets(1).cache\ntrainDataset.count\ntestDataset.count", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### Feature selection", "cell_type": "markdown", "metadata": {}}, {"source": "Encode the destination using one-hot encoding and include the columns Year, Month, DayofMonth, DayOfWeek, Distance", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}\n\nval indexer = new StringIndexer().setInputCol(\"Dest\").setOutputCol(\"DestIndex\") // .setHandleInvalid(\"skip\") // Only works on Spark 1.6 or later\nval encoder = new OneHotEncoder().setInputCol(\"DestIndex\").setOutputCol(\"DestVec\")\nval assembler = new VectorAssembler().setInputCols(Array(\"Year\",\"Month\",\"DayofMonth\",\"DayOfWeek\",\"Distance\",\"DestVec\")).setOutputCol(\"features\")", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### Build the model: Use SystemML's MLPipeline wrapper. \n\nThis wrapper invokes MultiLogReg.dml (for training) and GLM-predict.dml (for prediction). These DML algorithms are available at https://github.com/apache/incubator-systemml/tree/master/scripts/algorithms", "cell_type": "markdown", "metadata": {}}, {"execution_count": 9, "cell_type": "code", "source": "import org.apache.spark.ml.Pipeline\nimport org.apache.sysml.api.ml.LogisticRegression\n\nval lr = new LogisticRegression(\"log\", sc).setRegParam(1e-4).setTol(1e-2).setMaxInnerIter(5).setMaxOuterIter(5)\n\nval pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, lr))\nval model = pipeline.fit(trainDataset)", "outputs": [{"output_type": "stream", "name": "stdout", "text": "BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT\nReading X...\nReading Y...\n-- Initially: Objective = 61247.87116863789, Gradient Norm = 4.86977583580406E7, Trust Delta = 0.0013049801226298022\n-- Outer Iteration 1: Had 1 CG iterations\n -- Obj.Reduction: Actual = 10085.186599774679, Predicted = 9703.748642685421 (A/P: 1.0393), Trust Delta = 4.148360874623699E-4\n -- New Objective = 51162.68456886321, Beta Change Norm = 3.9852958205347075E-4, Gradient Norm = 3857928.1315281712\n \n-- Outer Iteration 2: Had 2 CG iterations\n -- Obj.Reduction: Actual = 140.7058278433251, Predicted = 138.05703502188976 (A/P: 1.0192), Trust Delta = 4.148360874623699E-4\n -- New Objective = 51021.978741019884, Beta Change Norm = 1.251386078420451E-4, Gradient Norm = 110384.84459596197\nTermination / Convergence condition satisfied.\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### Evaluate the model \n\nOutput RMS error on test data", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "val predictions = model.transform(testDataset.withColumnRenamed(\"label\", \"OriginalLabel\"))\npredictions.registerTempTable(\"predictions\")\nsqlContext.sql(\"SELECT sqrt(avg(pow(OriginalLabel - label, 2.0))) FROM predictions\").show", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### Perform k-fold cross-validation to tune the hyperparameters\n\nPerform cross-validation to tune the regularization parameter for Logistic regression.", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator\nimport org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}\n\nval crossval = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)\nval paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.1, 1e-3, 1e-6)).build()\ncrossval.setEstimatorParamMaps(paramGrid)\ncrossval.setNumFolds(2) // Setting k = 2\nval cvmodel = crossval.fit(trainDataset)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "### Evaluate the cross-validated model", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "val cvpredictions = cvmodel.transform(testDataset.withColumnRenamed(\"label\", \"OriginalLabel\"))\ncvpredictions.registerTempTable(\"cvpredictions\")\nsqlContext.sql(\"SELECT sqrt(avg(pow(OriginalLabel - label, 2.0))) FROM cvpredictions\").show", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "## Homework ;)\n\nRead http://apache.github.io/incubator-systemml/algorithms-classification.html#multinomial-logistic-regression and perform cross validation on other hyperparameters: for example: icpt, tol, maxOuterIter, maxInnerIter", "cell_type": "markdown", "metadata": {}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Scala 2.10.4 (Spark 1.5.2)", "name": "spark", "language": "scala"}, "language_info": {"name": "scala"}}}