diff --git a/01-PySpark-Get-Started.ipynb b/01-PySpark-Get-Started.ipynb
index 1a8e703..429faea 100644
--- a/01-PySpark-Get-Started.ipynb
+++ b/01-PySpark-Get-Started.ipynb
@@ -2,17 +2,63 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 1,
- "id": "fe9fd1c0-db30-47b1-bbe2-0b1cbd97a9e2",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"01-PySpark-Get-Started\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA BOOTSTRAP \u2014 run once per session after setup cell\n",
+ "# WHY: Colab's filesystem resets each session. This downloads the NYC TLC\n",
+ "# Yellow Taxi dataset \u2014 a real-world financial dataset with 19 columns\n",
+ "# including fare_amount, tip_amount, payment_type, and datetime fields.\n",
+ "# It replaces the synthetic sample files from the original repo which\n",
+ "# were too small and too simple to demonstrate meaningful PySpark operations.\n",
+ "# Run this notebook first. All subsequent notebooks reference /content/data/.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC Yellow Taxi Jan 2023 (~50MB, ~3M rows)\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ "\n",
+ "trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ "\n",
+ "# CSV version for CSV exercises\n",
+ "trips.limit(10000).write.mode('overwrite').option('header', True).csv('/content/data/trips_csv')\n",
+ "\n",
+ "# JSON version for JSON exercises\n",
+ "trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "# Text version for word count / text exercises\n",
+ "with open('/content/data/data.txt', 'w') as f:\n",
+ " f.write('hello world\\nhello spark\\nspark is fast\\nworld of spark\\nhello hello world\\n')\n",
+ "\n",
+ "print(f'Dataset ready: {trips.count():,} rows, {len(trips.columns)} columns')\n",
+ "trips.printSchema()"
]
},
{
@@ -22,8 +68,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Import PySpark\n",
- "from pyspark.sql import SparkSession"
+ "# Import PySpark\nfrom pyspark.sql import SparkSession"
]
},
{
@@ -43,10 +88,7 @@
}
],
"source": [
- "# Create a SparkSession\n",
- "spark = SparkSession.builder \\\n",
- " .appName(\"PySpark-Get-Started\") \\\n",
- " .getOrCreate()"
+ "# Create a SparkSession\nspark = SparkSession.builder \\\n .appName(\"PySpark-Get-Started\") \\\n .getOrCreate()"
]
},
{
@@ -78,10 +120,7 @@
}
],
"source": [
- "# Test the setup\n",
- "data = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35)]\n",
- "df = spark.createDataFrame(data, [\"Name\", \"Age\"])\n",
- "df.show()"
+ "# Test the setup\ndata = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35)]\ndf = spark.createDataFrame(data, [\"Name\", \"Age\"])\ndf.show()"
]
},
{
@@ -90,7 +129,9 @@
"id": "096c5a89-058c-488a-9d9e-146fdb6a44dd",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -114,4 +155,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/02-Create-SparkContext.ipynb b/02-Create-SparkContext.ipynb
index a64dc6f..ed415e3 100644
--- a/02-Create-SparkContext.ipynb
+++ b/02-Create-SparkContext.ipynb
@@ -3,16 +3,55 @@
{
"cell_type": "code",
"execution_count": null,
- "id": "9b42a8f0-9a63-461c-95dc-27847f5b0a40",
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"02-Create-SparkContext\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -30,10 +69,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from pyspark import SparkContext\n",
- "\n",
- "# Create a SparkContext object\n",
- "sc = SparkContext(appName=\"MySparkApplication\")"
+ "from pyspark import SparkContext\n\n# Create a SparkContext object\nsc = SparkContext(appName=\"MySparkApplication\")"
]
},
{
@@ -82,8 +118,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Shut down the current active SparkContext\n",
- "sc.stop()"
+ "# Shut down the current active SparkContext\nsc.stop()"
]
},
{
@@ -101,15 +136,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from pyspark.sql import SparkSession\n",
- "\n",
- "# Create a SparkSession\n",
- "spark = SparkSession.builder \\\n",
- " .appName(\"MySparkApplication\") \\\n",
- " .getOrCreate()\n",
- "\n",
- "# Get the SparkContext from the SparkSession\n",
- "sc = spark.sparkContext\n"
+ "from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder \\\n .appName(\"MySparkApplication\") \\\n .getOrCreate()\n\n# Get the SparkContext from the SparkSession\nsc = spark.sparkContext\n"
]
},
{
@@ -158,8 +185,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Shut down the current active SparkContext\n",
- "sc.stop() #or spark.stop()"
+ "# Shut down the current active SparkContext\nsc.stop() #or spark.stop()"
]
},
{
@@ -168,7 +194,9 @@
"id": "10336d77-bc36-4101-8055-dd7a2496d4da",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -192,4 +220,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/03-Create-SparkSession.ipynb b/03-Create-SparkSession.ipynb
index 80282dd..5c93072 100644
--- a/03-Create-SparkSession.ipynb
+++ b/03-Create-SparkSession.ipynb
@@ -3,16 +3,55 @@
{
"cell_type": "code",
"execution_count": null,
- "id": "5f80ab89-af0e-4139-8f6a-0a382310f34c",
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"03-Create-SparkSession\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -40,14 +79,7 @@
}
],
"source": [
- "from pyspark.sql import SparkSession\n",
- "\n",
- "# Create a SparkSession\n",
- "spark = SparkSession.builder \\\n",
- " .appName(\"MySparkApplication\") \\\n",
- " .config(\"spark.executor.memory\", \"2g\") \\\n",
- " .config(\"spark.sql.shuffle.partitions\", \"4\") \\\n",
- " .getOrCreate()"
+ "from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder \\\n .appName(\"MySparkApplication\") \\\n .config(\"spark.executor.memory\", \"2g\") \\\n .config(\"spark.sql.shuffle.partitions\", \"4\") \\\n .getOrCreate()"
]
},
{
@@ -91,8 +123,7 @@
}
],
"source": [
- "# Perform operations using the SparkSession\n",
- "spark"
+ "# Perform operations using the SparkSession\nspark"
]
},
{
@@ -102,15 +133,16 @@
"metadata": {},
"outputs": [],
"source": [
- "# Shut down the current active SparkSession\n",
- "spark.stop()"
+ "# Shut down the current active SparkSession\nspark.stop()"
]
},
{
"cell_type": "markdown",
"id": "f11e547f-54bd-43ee-b051-6722802bd567",
"metadata": {},
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -134,4 +166,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/04-RDD-Operations.ipynb b/04-RDD-Operations.ipynb
index d53c788..8b7a913 100644
--- a/04-RDD-Operations.ipynb
+++ b/04-RDD-Operations.ipynb
@@ -2,17 +2,56 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 1,
- "id": "5c0ddfc8-a1d1-4bb4-9cd5-180f11e4f3af",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"04-RDD-Operations\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -42,8 +81,7 @@
}
],
"source": [
- "# Create a SparkSession\n",
- "spark = SparkSession.builder.appName(\"RDD-Demo\").getOrCreate()"
+ "# Create a SparkSession\nspark = SparkSession.builder.appName(\"RDD-Demo\").getOrCreate()"
]
},
{
@@ -61,8 +99,7 @@
"metadata": {},
"outputs": [],
"source": [
- "numbers = [1, 2, 3, 4, 5]\n",
- "rdd = spark.sparkContext.parallelize(numbers)"
+ "numbers = [1, 2, 3, 4, 5]\nrdd = spark.sparkContext.parallelize(numbers)"
]
},
{
@@ -83,8 +120,7 @@
}
],
"source": [
- "# Collect action: Retrieve all elements of the RDD\n",
- "rdd.collect()"
+ "# Collect action: Retrieve all elements of the RDD\nrdd.collect()"
]
},
{
@@ -94,9 +130,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Create an RDD from a list of tuples\n",
- "data = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35), (\"Alice\", 40)]\n",
- "rdd = spark.sparkContext.parallelize(data)"
+ "# Create an RDD from a list of tuples\ndata = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35), (\"Alice\", 40)]\nrdd = spark.sparkContext.parallelize(data)"
]
},
{
@@ -114,8 +148,7 @@
}
],
"source": [
- "# Collect action: Retrieve all elements of the RDD\n",
- "print(\"All elements of the rdd: \", rdd.collect())"
+ "# Collect action: Retrieve all elements of the RDD\nprint(\"All elements of the rdd: \", rdd.collect())"
]
},
{
@@ -155,9 +188,7 @@
}
],
"source": [
- "# Count action: Count the number of elements in the RDD\n",
- "count = rdd.count()\n",
- "print(\"The total number of elements in rdd: \", count)"
+ "# Count action: Count the number of elements in the RDD\ncount = rdd.count()\nprint(\"The total number of elements in rdd: \", count)"
]
},
{
@@ -175,9 +206,7 @@
}
],
"source": [
- "# First action: Retrieve the first element of the RDD\n",
- "first_element = rdd.first()\n",
- "print(\"The first element of the rdd: \", first_element)"
+ "# First action: Retrieve the first element of the RDD\nfirst_element = rdd.first()\nprint(\"The first element of the rdd: \", first_element)"
]
},
{
@@ -195,9 +224,7 @@
}
],
"source": [
- "# Take action: Retrieve the n elements of the RDD\n",
- "taken_elements = rdd.take(2)\n",
- "print(\"The first two elements of the rdd: \", taken_elements)"
+ "# Take action: Retrieve the n elements of the RDD\ntaken_elements = rdd.take(2)\nprint(\"The first two elements of the rdd: \", taken_elements)"
]
},
{
@@ -218,8 +245,7 @@
}
],
"source": [
- "# Foreach action: Print each element of the RDD\n",
- "rdd.foreach(lambda x: print(x))"
+ "# Foreach action: Print each element of the RDD\nrdd.foreach(lambda x: print(x))"
]
},
{
@@ -237,8 +263,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Map transformation: Convert name to uppercase\n",
- "mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))"
+ "# Map transformation: Convert name to uppercase\nmapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))"
]
},
{
@@ -256,8 +281,7 @@
}
],
"source": [
- "result = mapped_rdd.collect()\n",
- "print(\"rdd with uppercease name: \", result)"
+ "result = mapped_rdd.collect()\nprint(\"rdd with uppercease name: \", result)"
]
},
{
@@ -278,9 +302,7 @@
}
],
"source": [
- "# Filter transformation: Filter records where age is greater than 30\n",
- "filtered_rdd = rdd.filter(lambda x: x[1] > 30)\n",
- "filtered_rdd.collect()"
+ "# Filter transformation: Filter records where age is greater than 30\nfiltered_rdd = rdd.filter(lambda x: x[1] > 30)\nfiltered_rdd.collect()"
]
},
{
@@ -301,9 +323,7 @@
}
],
"source": [
- "# ReduceByKey transformation: Calculate the total age for each name\n",
- "reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)\n",
- "reduced_rdd.collect()"
+ "# ReduceByKey transformation: Calculate the total age for each name\nreduced_rdd = rdd.reduceByKey(lambda x, y: x + y)\nreduced_rdd.collect()"
]
},
{
@@ -324,9 +344,7 @@
}
],
"source": [
- "# SortBy transformation: Sort the RDD by age in descending order\n",
- "sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)\n",
- "sorted_rdd.collect()"
+ "# SortBy transformation: Sort the RDD by age in descending order\nsorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)\nsorted_rdd.collect()"
]
},
{
@@ -344,8 +362,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Save action: Save the RDD to a text file\n",
- "rdd.saveAsTextFile(\"output.txt\")"
+ "# Save action: Save the RDD to a text file\nrdd.saveAsTextFile(\"output.txt\")"
]
},
{
@@ -366,9 +383,7 @@
}
],
"source": [
- "# create rdd from text file\n",
- "rdd_text = spark.sparkContext.textFile(\"output.txt\")\n",
- "rdd_text.collect()"
+ "# create rdd from text file\nrdd_text = spark.sparkContext.textFile(\"output.txt\")\nrdd_text.collect()"
]
},
{
@@ -395,7 +410,9 @@
"id": "695cb724-4691-441e-97f1-7320c109a62f",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -419,4 +436,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/05-DataFrame-Intro.ipynb b/05-DataFrame-Intro.ipynb
index b75fe8d..d5324f8 100644
--- a/05-DataFrame-Intro.ipynb
+++ b/05-DataFrame-Intro.ipynb
@@ -2,17 +2,56 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 37,
- "id": "68a6d7a7-693c-4fae-804f-3d92a1a30e35",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"05-DataFrame-Intro\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -22,8 +61,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from pyspark.sql import SparkSession\n",
- "from pyspark.sql.functions import desc"
+ "from pyspark.sql import SparkSession\nfrom pyspark.sql.functions import desc"
]
},
{
@@ -33,8 +71,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Create a SparkSession\n",
- "spark = SparkSession.builder.appName(\"DataFrame-Demo\").getOrCreate()"
+ "# Create a SparkSession\nspark = SparkSession.builder.appName(\"DataFrame-Demo\").getOrCreate()"
]
},
{
@@ -60,11 +97,7 @@
}
],
"source": [
- "rdd = spark.sparkContext.textFile(\"./data/data.txt\")\n",
- "result_rdd = rdd.flatMap(lambda line: line.split(\" \")) \\\n",
- " .map(lambda word: (word, 1)) \\\n",
- " .reduceByKey(lambda a, b: a + b) \\\n",
- " .sortBy(lambda x: x[1], ascending=False)"
+ "rdd = spark.sparkContext.textFile(\"/content/data/data.txt\")\nresult_rdd = rdd.flatMap(lambda line: line.split(\" \")) \\\n .map(lambda word: (word, 1)) \\\n .reduceByKey(lambda a, b: a + b) \\\n .sortBy(lambda x: x[1], ascending=False)"
]
},
{
@@ -112,10 +145,7 @@
"metadata": {},
"outputs": [],
"source": [
- "df = spark.read.text(\"./data/data.txt\")\n",
- "\n",
- "result_df = df.selectExpr(\"explode(split(value, ' ')) as word\") \\\n",
- " .groupBy(\"word\").count().orderBy(desc(\"count\"))"
+ "df = spark.read.text(\"/content/data/data.txt\")\n\nresult_df = df.selectExpr(\"explode(split(value, ' ')) as word\") \\\n .groupBy(\"word\").count().orderBy(desc(\"count\"))"
]
},
{
@@ -164,7 +194,9 @@
"id": "e509b04d-32b3-4b17-ab4e-6ed025b09762",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -188,4 +220,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/06-DataFrame-from-various-data-source.ipynb b/06-DataFrame-from-various-data-source.ipynb
index 83d8776..e5a560b 100644
--- a/06-DataFrame-from-various-data-source.ipynb
+++ b/06-DataFrame-from-various-data-source.ipynb
@@ -2,17 +2,56 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 1,
- "id": "3d303751-5833-413a-8698-7d9cc74001cc",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"06-DataFrame-from-various-data-source\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -32,10 +71,7 @@
}
],
"source": [
- "from pyspark.sql import SparkSession\n",
- "\n",
- "# Create a SparkSession\n",
- "spark = SparkSession.builder.appName(\"Create-DataFrame\").getOrCreate()"
+ "from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder.appName(\"Create-DataFrame\").getOrCreate()"
]
},
{
@@ -70,8 +106,7 @@
}
],
"source": [
- "%%bash \n",
- "head -10 ./data/products.csv"
+ "%%bash \nhead -10 /content/data/products.csv"
]
},
{
@@ -89,9 +124,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Read CSV file into DataFrame\n",
- "csv_file_path = \"./data/products.csv\"\n",
- "df = spark.read.csv(csv_file_path, header=True)"
+ "# Read CSV file into DataFrame\ncsv_file_path = \"/content/data/products.csv\"\ndf = spark.read.csv(csv_file_path, header=True)"
]
},
{
@@ -126,11 +159,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Display content of DataFrame\n",
- "df.show(5)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Display content of DataFrame\ndf.show(5)"
]
},
{
@@ -148,8 +177,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# import necessary types\n",
- "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType"
+ "# import necessary types\nfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType"
]
},
{
@@ -159,14 +187,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Define the schema\n",
- "schema = StructType([\n",
- " StructField(name=\"id\", dataType=IntegerType(), nullable=True),\n",
- " StructField(name=\"name\", dataType=StringType(), nullable=True),\n",
- " StructField(name=\"category\", dataType=StringType(), nullable=True),\n",
- " StructField(name=\"quantity\", dataType=IntegerType(), nullable=True),\n",
- " StructField(name=\"price\", dataType=DoubleType(), nullable=True)\n",
- "])"
+ "# Define the schema\nschema = StructType([\n StructField(name=\"id\", dataType=IntegerType(), nullable=True),\n StructField(name=\"name\", dataType=StringType(), nullable=True),\n StructField(name=\"category\", dataType=StringType(), nullable=True),\n StructField(name=\"quantity\", dataType=IntegerType(), nullable=True),\n StructField(name=\"price\", dataType=DoubleType(), nullable=True)\n])"
]
},
{
@@ -176,9 +197,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Read CSV file into DataFrame with schema definition\n",
- "csv_file_path = \"./data/products.csv\"\n",
- "df = spark.read.csv(csv_file_path, header=True, schema=schema)"
+ "# Read CSV file into DataFrame with schema definition\ncsv_file_path = \"/content/data/products.csv\"\ndf = spark.read.csv(csv_file_path, header=True, schema=schema)"
]
},
{
@@ -213,11 +232,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Display content of DataFrame\n",
- "df.show(5)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Display content of DataFrame\ndf.show(5)"
]
},
{
@@ -235,9 +250,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Read CSV file into DataFrame with inferSchema\n",
- "csv_file_path = \"./data/products.csv\"\n",
- "df = spark.read.csv(csv_file_path, header=True, inferSchema=True)"
+ "# Read CSV file into DataFrame with inferSchema\ncsv_file_path = \"/content/data/products.csv\"\ndf = spark.read.csv(csv_file_path, header=True, inferSchema=True)"
]
},
{
@@ -272,11 +285,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Display content of DataFrame\n",
- "df.show(5)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Display content of DataFrame\ndf.show(5)"
]
},
{
@@ -319,8 +328,7 @@
}
],
"source": [
- "%%bash\n",
- "head -10 data/products_singleline.json"
+ "%%bash\nhead -10 data/products_singleline.json"
]
},
{
@@ -330,10 +338,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Read single line JSON\n",
- "# Each row is a JSON record, records are separated by new line\n",
- "json_file_path = \"./data/products_singleline.json\"\n",
- "df = spark.read.json(json_file_path)"
+ "# Read single line JSON\n# Each row is a JSON record, records are separated by new line\njson_file_path = \"/content/data/products_singleline.json\"\ndf = spark.read.json(json_file_path)"
]
},
{
@@ -368,11 +373,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Display content of DataFrame\n",
- "df.show(5)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Display content of DataFrame\ndf.show(5)"
]
},
{
@@ -417,8 +418,7 @@
}
],
"source": [
- "%%bash\n",
- "head -20 data/products_multiline.json"
+ "%%bash\nhead -20 data/products_multiline.json"
]
},
{
@@ -428,11 +428,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Read multi-line JSON\n",
- "# JSON is an array of record, records are separated by a comma.\n",
- "# each record is defined in multiple lines\n",
- "json_file_path = \"./data/products_multiline.json\"\n",
- "df = spark.read.json(json_file_path, multiLine=True)"
+ "# Read multi-line JSON\n# JSON is an array of record, records are separated by a comma.\n# each record is defined in multiple lines\njson_file_path = \"/content/data/products_multiline.json\"\ndf = spark.read.json(json_file_path, multiLine=True)"
]
},
{
@@ -467,11 +463,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Display content of DataFrame\n",
- "df.show(5)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Display content of DataFrame\ndf.show(5)"
]
},
{
@@ -489,9 +481,7 @@
}
],
"source": [
- "# write dataframe into parquet file\n",
- "parquet_file_path = \"./data/products.parquet\"\n",
- "df.write.parquet(parquet_file_path)"
+ "# write dataframe into parquet file\nparquet_file_path = \"/content/data/products.parquet\"\ndf.write.mode(\"overwrite\").parquet(parquet_file_path)"
]
},
{
@@ -544,11 +534,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Display content of DataFrame\n",
- "df.show(5)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Display content of DataFrame\ndf.show(5)"
]
},
{
@@ -567,7 +553,9 @@
"id": "1771b724-c7a5-4f5a-97c1-0dac5f1b03c9",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
},
{
"cell_type": "code",
@@ -575,7 +563,9 @@
"id": "a4657b0b-5d1e-4987-863a-bbbc9bc564b0",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -599,4 +589,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/07-DataFrame-Operations.ipynb b/07-DataFrame-Operations.ipynb
index ed25cb8..fe9f162 100644
--- a/07-DataFrame-Operations.ipynb
+++ b/07-DataFrame-Operations.ipynb
@@ -2,17 +2,56 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 18,
- "id": "fa80df4e-9ddc-4db6-a801-717cd67ae883",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"07-DataFrame-Operations\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -22,10 +61,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from pyspark.sql import SparkSession\n",
- "\n",
- "# Create a SparkSession\n",
- "spark = SparkSession.builder.appName(\"DataFrame-Operations\").getOrCreate()"
+ "from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder.appName(\"DataFrame-Operations\").getOrCreate()"
]
},
{
@@ -52,8 +88,7 @@
}
],
"source": [
- "%%bash\n",
- "head -10 data/stocks.txt"
+ "%%bash\nhead -10 data/stocks.txt"
]
},
{
@@ -63,9 +98,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Load the synthetic data into a DataFrame\n",
- "data_file_path = \"./data/stocks.txt\"\n",
- "df = spark.read.csv(data_file_path, header=True, inferSchema=True)"
+ "# Load the synthetic data into a DataFrame\ndata_file_path = \"/content/data/stocks.txt\"\ndf = spark.read.csv(data_file_path, header=True, inferSchema=True)"
]
},
{
@@ -106,12 +139,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Show the initial DataFrame\n",
- "print(\"Initial DataFrame:\")\n",
- "df.show(10)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Show the initial DataFrame\nprint(\"Initial DataFrame:\")\ndf.show(10)"
]
},
{
@@ -153,10 +181,7 @@
}
],
"source": [
- "# Select specific columns\n",
- "selected_columns = df.select(\"id\", \"name\", \"price\")\n",
- "print(\"Selected Columns:\")\n",
- "selected_columns.show(10)"
+ "# Select specific columns\nselected_columns = df.select(\"id\", \"name\", \"price\")\nprint(\"Selected Columns:\")\nselected_columns.show(10)"
]
},
{
@@ -199,10 +224,7 @@
}
],
"source": [
- "# Filter rows based on a condition\n",
- "filtered_data = df.filter(df.quantity > 20)\n",
- "print(\"Filtered Data:\", filtered_data.count())\n",
- "filtered_data.show()"
+ "# Filter rows based on a condition\nfiltered_data = df.filter(df.quantity > 20)\nprint(\"Filtered Data:\", filtered_data.count())\nfiltered_data.show()"
]
},
{
@@ -210,8 +232,7 @@
"id": "18283acf-69eb-4140-a4dd-273c9eb5eafd",
"metadata": {},
"source": [
- "### GroupBy: Group data based on specific columns \n",
- "### Aggregations: Perform functions like sum, average, etc., on grouped data."
+ "### GroupBy: Group data based on specific columns \n### Aggregations: Perform functions like sum, average, etc., on grouped data."
]
},
{
@@ -240,10 +261,7 @@
}
],
"source": [
- "# GroupBy and Aggregations\n",
- "grouped_data = df.groupBy(\"category\").agg({\"quantity\": \"sum\", \"price\": \"avg\"})\n",
- "print(\"Grouped and Aggregated Data:\")\n",
- "grouped_data.show()"
+ "# GroupBy and Aggregations\ngrouped_data = df.groupBy(\"category\").agg({\"quantity\": \"sum\", \"price\": \"avg\"})\nprint(\"Grouped and Aggregated Data:\")\ngrouped_data.show()"
]
},
{
@@ -284,11 +302,7 @@
}
],
"source": [
- "# Join with another DataFrame\n",
- "df2 = df.select(\"id\", \"category\").limit(10)\n",
- "joined_data = df.join(df2, \"id\", \"inner\")\n",
- "print(\"Joined Data:\")\n",
- "joined_data.show()"
+ "# Join with another DataFrame\ndf2 = df.select(\"id\", \"category\").limit(10)\njoined_data = df.join(df2, \"id\", \"inner\")\nprint(\"Joined Data:\")\njoined_data.show()"
]
},
{
@@ -330,10 +344,7 @@
}
],
"source": [
- "# Sort by a column\n",
- "sorted_data = df.orderBy(\"price\")\n",
- "print(\"Sorted Data:\")\n",
- "sorted_data.show(10)"
+ "# Sort by a column\nsorted_data = df.orderBy(\"price\")\nprint(\"Sorted Data:\")\nsorted_data.show(10)"
]
},
{
@@ -367,11 +378,7 @@
}
],
"source": [
- "# Sort by a column desc\n",
- "from pyspark.sql.functions import col, desc\n",
- "sorted_data = df.orderBy(col(\"price\").desc(), col(\"id\").desc())\n",
- "print(\"Sorted Data Descending:\")\n",
- "sorted_data.show(10)"
+ "# Sort by a column desc\nfrom pyspark.sql.functions import col, desc\nsorted_data = df.orderBy(col(\"price\").desc(), col(\"id\").desc())\nprint(\"Sorted Data Descending:\")\nsorted_data.show(10)"
]
},
{
@@ -408,10 +415,7 @@
}
],
"source": [
- "# Get distinct product category\n",
- "distinct_rows = df.select(\"category\").distinct()\n",
- "print(\"Distinct Product Categories:\")\n",
- "distinct_rows.show()"
+ "# Get distinct product category\ndistinct_rows = df.select(\"category\").distinct()\nprint(\"Distinct Product Categories:\")\ndistinct_rows.show()"
]
},
{
@@ -453,10 +457,7 @@
}
],
"source": [
- "# Drop columns\n",
- "dropped_columns = df.drop(\"quantity\", \"category\")\n",
- "print(\"Dropped Columns:\")\n",
- "dropped_columns.show(10)"
+ "# Drop columns\ndropped_columns = df.drop(\"quantity\", \"category\")\nprint(\"Dropped Columns:\")\ndropped_columns.show(10)"
]
},
{
@@ -498,10 +499,7 @@
}
],
"source": [
- "# Add a new calculated column\n",
- "df_with_new_column = df.withColumn(\"revenue\", df.quantity * df.price)\n",
- "print(\"DataFrame with New Column:\")\n",
- "df_with_new_column.show(10)"
+ "# Add a new calculated column\ndf_with_new_column = df.withColumn(\"revenue\", df.quantity * df.price)\nprint(\"DataFrame with New Column:\")\ndf_with_new_column.show(10)"
]
},
{
@@ -543,10 +541,7 @@
}
],
"source": [
- "# Rename columns using alias\n",
- "df_with_alias = df.withColumnRenamed(\"price\", \"product_price\")\n",
- "print(\"DataFrame with Aliased Column:\")\n",
- "df_with_alias.show(10)"
+ "# Rename columns using alias\ndf_with_alias = df.withColumnRenamed(\"price\", \"product_price\")\nprint(\"DataFrame with Aliased Column:\")\ndf_with_alias.show(10)"
]
},
{
@@ -556,8 +551,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Stop the SparkSession\n",
- "spark.stop()"
+ "# Stop the SparkSession\nspark.stop()"
]
},
{
@@ -566,7 +560,9 @@
"id": "b89aa6d1-157a-46da-b9a2-c09feeb2c82e",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -590,4 +586,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/08-Spark-SQL.ipynb b/08-Spark-SQL.ipynb
index f75c80f..0d6ea2d 100644
--- a/08-Spark-SQL.ipynb
+++ b/08-Spark-SQL.ipynb
@@ -2,17 +2,56 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 52,
- "id": "7f41cf64-565f-48b4-a5e5-4d48fc404270",
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
- "# Set the PySpark environment variables\n",
- "import os\n",
- "os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
- "os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
- "os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
- "os.environ['PYSPARK_PYTHON'] = 'python'"
+ "# COLAB SETUP \u2014 run this cell first every session\n",
+ "# WHY: Google Colab resets its filesystem and Python environment on every\n",
+ "# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
+ "# Python package so no SPARK_HOME or local Spark installation is needed.\n",
+ "# The original os.environ cell hardcoded a local Mac path that does not\n",
+ "# exist on Colab's VM and has been removed from all notebooks.\n",
+ "!pip install pyspark -q\n",
+ "\n",
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder \\\n",
+ " .appName(\"08-Spark-SQL\") \\\n",
+ " .getOrCreate()\n",
+ "\n",
+ "sc = spark.sparkContext\n",
+ "print(spark.version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# DATA PATH SETUP\n",
+ "# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
+ "# this cell clones the repo data and downloads the NYC TLC dataset.\n",
+ "# It is idempotent \u2014 safe to run multiple times.\n",
+ "import os, shutil, subprocess\n",
+ "\n",
+ "os.makedirs('/content/data', exist_ok=True)\n",
+ "\n",
+ "# Clone repo data files if not present\n",
+ "if not os.path.exists('/content/pyspark-tutorial'):\n",
+ " subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
+ " '/content/pyspark-tutorial'], check=True)\n",
+ "shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
+ "\n",
+ "# Download NYC TLC dataset if not present\n",
+ "if not os.path.exists('/content/data/trips.parquet'):\n",
+ " os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
+ " trips = spark.read.parquet('/content/data/trips.parquet')\n",
+ " trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
+ " trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
+ "\n",
+ "print('Data ready:', os.listdir('/content/data'))"
]
},
{
@@ -22,10 +61,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from pyspark.sql import SparkSession\n",
- "\n",
- "# Create a SparkSession\n",
- "spark = SparkSession.builder.appName(\"DataFrameSQL\").getOrCreate()"
+ "from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder.appName(\"DataFrameSQL\").getOrCreate()"
]
},
{
@@ -52,8 +88,7 @@
}
],
"source": [
- "%%bash\n",
- "head -10 ./data/persons.csv"
+ "%%bash\nhead -10 /content/data/persons.csv"
]
},
{
@@ -71,9 +106,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Load the synthetic data into a DataFrame\n",
- "data_file_path = \"./data/persons.csv\"\n",
- "df = spark.read.csv(data_file_path, header=True, inferSchema=True)"
+ "# Load the synthetic data into a DataFrame\ndata_file_path = \"/content/data/persons.csv\"\ndf = spark.read.csv(data_file_path, header=True, inferSchema=True)"
]
},
{
@@ -113,12 +146,7 @@
}
],
"source": [
- "# Display schema of DataFrame\n",
- "df.printSchema()\n",
- "\n",
- "# Show the initial DataFrame\n",
- "print(\"Initial DataFrame:\")\n",
- "df.show(10)"
+ "# Display schema of DataFrame\ndf.printSchema()\n\n# Show the initial DataFrame\nprint(\"Initial DataFrame:\")\ndf.show(10)"
]
},
{
@@ -136,8 +164,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Register the DataFrame as a Temporary Table\n",
- "df.createOrReplaceTempView(\"my_table\")"
+ "# Register the DataFrame as a Temporary Table\ndf.createOrReplaceTempView(\"my_table\")"
]
},
{
@@ -185,10 +212,7 @@
}
],
"source": [
- "# Select all rows where age is greater than 25\n",
- "result = spark.sql(\"SELECT * FROM my_table WHERE age > 25\")\n",
- "\n",
- "result.show()"
+ "# Select all rows where age is greater than 25\nresult = spark.sql(\"SELECT * FROM my_table WHERE age > 25\")\n\nresult.show()"
]
},
{
@@ -212,9 +236,7 @@
}
],
"source": [
- "# Compute the average salary by gender\n",
- "avg_salary_by_gender = spark.sql(\"SELECT gender, AVG(salary) as avg_salary FROM my_table GROUP BY gender\")\n",
- "avg_salary_by_gender.show()"
+ "# Compute the average salary by gender\navg_salary_by_gender = spark.sql(\"SELECT gender, AVG(salary) as avg_salary FROM my_table GROUP BY gender\")\navg_salary_by_gender.show()"
]
},
{
@@ -232,8 +254,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Create a temporary view\n",
- "df.createOrReplaceTempView(\"people\")"
+ "# Create a temporary view\ndf.createOrReplaceTempView(\"people\")"
]
},
{
@@ -273,10 +294,7 @@
}
],
"source": [
- "# Query the temporary view\n",
- "result = spark.sql(\"SELECT * FROM people WHERE age > 25\")\n",
- "\n",
- "result.show()"
+ "# Query the temporary view\nresult = spark.sql(\"SELECT * FROM people WHERE age > 25\")\n\nresult.show()"
]
},
{
@@ -297,9 +315,7 @@
}
],
"source": [
- "# Check if a temporary view exists\n",
- "view_exists = spark.catalog.tableExists(\"people\")\n",
- "view_exists"
+ "# Check if a temporary view exists\nview_exists = spark.catalog.tableExists(\"people\")\nview_exists"
]
},
{
@@ -320,8 +336,7 @@
}
],
"source": [
- "# Drop a temporary view\n",
- "spark.catalog.dropTempView(\"people\")"
+ "# Drop a temporary view\nspark.catalog.dropTempView(\"people\")"
]
},
{
@@ -342,9 +357,7 @@
}
],
"source": [
- "# Check if a temporary view exists\n",
- "view_exists = spark.catalog.tableExists(\"people\")\n",
- "view_exists"
+ "# Check if a temporary view exists\nview_exists = spark.catalog.tableExists(\"people\")\nview_exists"
]
},
{
@@ -397,24 +410,7 @@
}
],
"source": [
- "# Create DataFrames\n",
- "employee_data = [\n",
- " (1, \"John\"), (2, \"Alice\"), (3, \"Bob\"), (4, \"Emily\"),\n",
- " (5, \"David\"), (6, \"Sarah\"), (7, \"Michael\"), (8, \"Lisa\"),\n",
- " (9, \"William\")\n",
- "]\n",
- "employees = spark.createDataFrame(employee_data, [\"id\", \"name\"])\n",
- "\n",
- "salary_data = [\n",
- " (\"HR\", 1, 60000), (\"HR\", 2, 55000), (\"HR\", 3, 58000),\n",
- " (\"IT\", 4, 70000), (\"IT\", 5, 72000), (\"IT\", 6, 68000),\n",
- " (\"Sales\", 7, 75000), (\"Sales\", 8, 78000), (\"Sales\", 9, 77000)\n",
- "]\n",
- "salaries = spark.createDataFrame(salary_data, [\"department\", \"id\", \"salary\"])\n",
- "\n",
- "employees.show()\n",
- "\n",
- "salaries.show()"
+ "# Create DataFrames\nemployee_data = [\n (1, \"John\"), (2, \"Alice\"), (3, \"Bob\"), (4, \"Emily\"),\n (5, \"David\"), (6, \"Sarah\"), (7, \"Michael\"), (8, \"Lisa\"),\n (9, \"William\")\n]\nemployees = spark.createDataFrame(employee_data, [\"id\", \"name\"])\n\nsalary_data = [\n (\"HR\", 1, 60000), (\"HR\", 2, 55000), (\"HR\", 3, 58000),\n (\"IT\", 4, 70000), (\"IT\", 5, 72000), (\"IT\", 6, 68000),\n (\"Sales\", 7, 75000), (\"Sales\", 8, 78000), (\"Sales\", 9, 77000)\n]\nsalaries = spark.createDataFrame(salary_data, [\"department\", \"id\", \"salary\"])\n\nemployees.show()\n\nsalaries.show()"
]
},
{
@@ -424,9 +420,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Register as temporary views\n",
- "employees.createOrReplaceTempView(\"employees\")\n",
- "salaries.createOrReplaceTempView(\"salaries\")"
+ "# Register as temporary views\nemployees.createOrReplaceTempView(\"employees\")\nsalaries.createOrReplaceTempView(\"salaries\")"
]
},
{
@@ -453,18 +447,7 @@
}
],
"source": [
- "# Subquery to find employees with salaries above average\n",
- "result = spark.sql(\"\"\"\n",
- " SELECT name\n",
- " FROM employees\n",
- " WHERE id IN (\n",
- " SELECT id\n",
- " FROM salaries\n",
- " WHERE salary > (SELECT AVG(salary) FROM salaries)\n",
- " )\n",
- "\"\"\")\n",
- "\n",
- "result.show()"
+ "# Subquery to find employees with salaries above average\nresult = spark.sql(\"\"\"\n SELECT name\n FROM employees\n WHERE id IN (\n SELECT id\n FROM salaries\n WHERE salary > (SELECT AVG(salary) FROM salaries)\n )\n\"\"\")\n\nresult.show()"
]
},
{
@@ -482,8 +465,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from pyspark.sql.window import Window\n",
- "from pyspark.sql import functions as F"
+ "from pyspark.sql.window import Window\nfrom pyspark.sql import functions as F"
]
},
{
@@ -514,13 +496,7 @@
}
],
"source": [
- "employee_salary = spark.sql(\"\"\"\n",
- " select salaries.*, employees.name\n",
- " from salaries \n",
- " left join employees on salaries.id = employees.id\n",
- "\"\"\")\n",
- "\n",
- "employee_salary.show()"
+ "employee_salary = spark.sql(\"\"\"\n select salaries.*, employees.name\n from salaries \n left join employees on salaries.id = employees.id\n\"\"\")\n\nemployee_salary.show()"
]
},
{
@@ -530,8 +506,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Create a window specification\n",
- "window_spec = Window.partitionBy(\"department\").orderBy(F.desc(\"salary\"))"
+ "# Create a window specification\nwindow_spec = Window.partitionBy(\"department\").orderBy(F.desc(\"salary\"))"
]
},
{
@@ -562,8 +537,7 @@
}
],
"source": [
- "# Calculate the rank of employees within each department based on salary\n",
- "employee_salary.withColumn(\"rank\", F.rank().over(window_spec)).show()"
+ "# Calculate the rank of employees within each department based on salary\nemployee_salary.withColumn(\"rank\", F.rank().over(window_spec)).show()"
]
},
{
@@ -573,8 +547,7 @@
"metadata": {},
"outputs": [],
"source": [
- "# Stop the SparkSession\n",
- "spark.stop()"
+ "# Stop the SparkSession\nspark.stop()"
]
},
{
@@ -583,7 +556,9 @@
"id": "fea9d104-e4b3-4be0-9fae-43fdbc391691",
"metadata": {},
"outputs": [],
- "source": []
+ "source": [
+ ""
+ ]
}
],
"metadata": {
@@ -607,4 +582,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
+}
\ No newline at end of file
diff --git a/AGENTS.md b/AGENTS.md
new file mode 100644
index 0000000..36c02ed
--- /dev/null
+++ b/AGENTS.md
@@ -0,0 +1,67 @@
+# AGENTS.md
+
+## Purpose
+This repository teaches PySpark to beginners via 8 progressive Jupyter notebooks.
+It is designed to run on Google Colab with zero local setup required.
+
+## How to use this repo to teach PySpark
+
+1. Direct the learner to open any notebook via the Colab badge in README.md
+2. Notebook 01 contains the data bootstrap cell — it must be run first
+3. Notebooks proceed in order: environment → RDD → DataFrame → Spark SQL
+4. Each notebook is self-contained after the bootstrap cell runs
+
+## Teaching walkthrough for AI assistants
+
+When guiding a human through this tutorial:
+- Present one stage at a time — do not advance until the current stage is complete
+- Ask test questions ONE AT A TIME — wait for the human answer before the next question
+- If the answer is wrong, correct it with a clear explanation before moving on
+- Only unlock the next stage after all questions for the current stage are answered
+- Fix Colab errors immediately when they appear — common errors are documented below
+
+## Dataset
+
+NYC TLC Yellow Taxi Trip Records (January 2023)
+- Source: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+- Download: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
+- Format: Parquet (native), CSV and JSON versions derived in bootstrap cell
+- Size: ~3 million rows, 19 columns
+- Key columns: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
+ passenger_count, trip_distance, fare_amount, tip_amount, total_amount,
+ payment_type, RatecodeID, store_and_fwd_flag
+
+Why this dataset:
+- Real financial columns (fare_amount, tip_amount, total_amount) make
+ groupBy, agg, and window function exercises meaningful
+- Datetime columns enable time-based analysis exercises
+- Natural nulls teach null handling without artificial setup
+- Widely used in industry PySpark tutorials — learners can Google it
+- Available in native Parquet format — demonstrates schema embedding
+ and columnar storage benefits directly
+
+## Stage structure
+
+- Stage 1 (NB 01-03): Environment, SparkSession, SparkContext, Catalyst optimizer
+- Stage 2 (NB 04): RDDs — transformations, actions, lazy evaluation
+- Stage 3 (NB 05-07): DataFrames — schema, data sources, operations
+- Stage 4 (NB 08): Spark SQL — temp views, SQL vs DataFrame API, window functions
+- Stage 5 (beyond repo): Performance, partitioning, Databricks
+
+## Common Colab errors and fixes
+
+| Error | Cause | Fix |
+|-------|-------|-----|
+| FileNotFoundError: spark-submit | SPARK_HOME set to local Mac path | Delete os.environ cell, run pip setup cell |
+| NoneType has no attribute sc | SparkSession died (runtime reset) | Re-run setup cell at top of notebook |
+| Input path does not exist | Relative ./data/ path | Use /content/data/ absolute path |
+| PATH_ALREADY_EXISTS | write() without mode specified | Add .mode("overwrite") |
+| CalledProcessError exit 128 | git clone on existing directory | Add shutil.rmtree guard before clone |
+
+## Visual learning assets
+
+Diagrams explaining key concepts are in the /assets/ folder and embedded in README.md:
+- partition-vs-table.svg — distributed data model
+- rdd-vs-dataframe.svg — schema vs no schema
+- csv-vs-parquet.svg — columnar storage and embedded schema
+- groupby-vs-window.svg — aggregation vs window functions
diff --git a/README.md b/README.md
index b5ec4bb..6e1aae5 100644
--- a/README.md
+++ b/README.md
@@ -1,91 +1,147 @@
-# PySpark Tutorial for Beginners - Jupyter Notebooks
+# PySpark tutorial for beginners
-Welcome to the PySpark Tutorial for Beginners GitHub repository! This repository contains a collection of Jupyter notebooks used in my comprehensive [YouTube video: PySpark tutorial for beginners](https://youtu.be/EB8lfdxpirM). These notebooks provide hands-on examples and code snippets to help you understand and practice PySpark concepts covered in the tutorial video.
+A collection of 8 Jupyter notebooks teaching PySpark from environment setup
+to Spark SQL — designed to run on Google Colab with zero local setup.
-If you find this tutorial helpful, consider sharing this video with your friends and colleagues to help them unlock the power of PySpark and unlock the following bonus videos.
+[](https://colab.research.google.com/github/coder2j/pyspark-tutorial/blob/main/01-PySpark-Get-Started.ipynb)
-🎁 Bonus Videos:
-- Hit **50,000 views** to unlock a video about building an **end-to-end machine-learning pipeline with PySpark**.
-- Hit **100,000 views** to unlock another video video about **end-to-end spark streaming**.
+---
-Do you like this tutorial? Why not check out my other video of [Airflow Tutorial for Beginners](https://youtu.be/K9AnJ9_ZAXE), which has more than **350k views 👀** and around **7k likes 👍**.
+## Quick start
-Don't forget to subscribe to my [YouTube channel](https://www.youtube.com/c/coder2j) and [my blog](https://coder2j.com/) for more exciting tutorials like this. And connect me on [X/Twitter](https://twitter.com/coder2j) and [Linkedin](https://www.linkedin.com/in/coder2j/), I post content there regularly too. Thank you for your support! ❤️
+1. Click the badge above to open Notebook 01 in Colab
+2. Run the **setup cell** first (installs pyspark)
+3. Run the **data bootstrap cell** (downloads the NYC TLC dataset)
+4. Work through the notebooks in order
+> **Note:** The original tutorial was written for a local Mac installation.
+> This fork fixes all Colab compatibility issues and replaces synthetic data
+> with the [NYC TLC Yellow Taxi dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
+> for more realistic exercises.
-## Table of Contents
+---
-- [Introduction](#introduction)
-- [Getting Started](#getting-started)
-- [Notebook Descriptions](#notebook-descriptions)
-- [Prerequisites](#prerequisites)
-- [Usage](#usage)
-- [Contributing](#contributing)
-- [License](#license)
+## What changed from the original
-## Introduction
+| Issue | Fix |
+|-------|-----|
+| `os.environ SPARK_HOME` cell hardcoded a local Mac path | Deleted — replaced with `pip install pyspark` setup cell |
+| All data paths used `./data/` (relative) | Changed to `/content/data/` (absolute, works in Colab) |
+| `df.write` failed on re-run | Added `.mode("overwrite")` to all write cells |
+| `git clone` failed on re-run | Added existence check before clone |
+| Synthetic sample data with 5-20 rows | Replaced with NYC TLC dataset (~3M rows, 19 real columns) |
-In our [PySpark tutorial video](https://youtu.be/EB8lfdxpirM), we covered various topics, including Spark installation, SparkContext, SparkSession, RDD transformations and actions, Spark DataFrames, Spark SQL, and more. These Jupyter notebooks are designed to complement the video content, allowing you to follow along, experiment, and practice your PySpark skills.
+---
-## Getting Started
+## Dataset
-To get started with the Jupyter notebooks, follow these steps:
+All notebooks use the **NYC TLC Yellow Taxi Trip Records (January 2023)**.
-1. Clone this GitHub repository to your local machine using the following command:
+Downloaded automatically by the bootstrap cell in Notebook 01.
- ```bash
- git clone https://github.com/coder2j/pyspark-tutorial.git
- ```
+Key columns: `VendorID`, `tpep_pickup_datetime`, `fare_amount`,
+`tip_amount`, `total_amount`, `payment_type`, `trip_distance`, `passenger_count`
-2. Ensure you have Python and Jupyter Notebook installed on your machine.
+---
-3. Follow the YouTube video part 2: Spark Installation to make sure Spark has been installed on your machine.
+## Stage structure
-4. Launch Jupyter Notebook by running:
+### Stage 1 — Environment & architecture (Notebooks 01–03)
- ```bash
- jupyter notebook
- ```
+Covers: SparkSession, SparkContext, Catalyst optimizer, logical vs physical plan,
+lazy evaluation, `local[*]` master, key config options.
-5. Open the notebook you want to work on and start experimenting with PySpark.
+
-## Notebook Descriptions
+**Test yourself:**
+1. What is the difference between SparkContext and SparkSession?
+2. Why does `getOrCreate()` exist instead of `create()`?
+3. What does `local[*]` mean in the master field?
+4. What does the Catalyst optimizer do and when does it run?
+5. What is the difference between a logical plan and a physical plan?
-- **Notebook 1 - 01-PySpark-Get-Started**: Instructions and commands for setting the PySpark environment variables to use spark in jupyter notebook.
+---
-- **Notebook 2 - 02-Create-SparkContext**: Creating SparkContext objects in different PySpark versions.
+### Stage 2 — RDDs (Notebook 04)
+Covers: transformations vs actions, lazy evaluation, `map`, `filter`,
+`flatMap`, `reduce`, `collect`, `take`, `foreach`.
-- **Notebook 3 - 03-Create-SparkSession.ipynb**: Creating SparkSession objects in PySpark.
+
-- **Notebook 4 - 04-RDD-Operations.ipynb**: Creating RDD and Demonstrating RDD transformations and actions.
+**Test yourself:**
+1. What is the difference between a transformation and an action?
+2. Why does `foreach()` produce no output in a notebook?
+3. Why should you never use `collect()` on a large dataset?
+4. What does this produce: `[1..10].filter(even).map(square).reduce(sum)`?
+5. What does resilient mean in RDD?
-- **Notebook 5 - 05-DataFrame-Intro.ipynb**: Introduction to Spark DataFrames and differences compared to RDD.
+---
-- **Notebook 6 - 06-DataFrame-from-various-data-source.ipynb**: Creating Spark Dataframe from various data sources.
+### Stage 3 — DataFrames (Notebooks 05–07)
-- **Notebook 7 - 07-DataFrame-Operations.ipynb**: Performing Spark Dataframe operations like filtering, aggregation, etc.
+Covers: schema, `printSchema()`, `show()`, `count()`, reading CSV/JSON/Parquet,
+write modes, `select`, `filter`, `withColumn`, `groupBy`, `agg`, `join`,
+null handling, `explode`, `split`.
-- **Notebook 8 - 08-Spark-SQL.ipynb**: Converting Spark Dataframe to a temporary table or view and performing SQL operations using Spark SQL.
+
-Feel free to explore and run these notebooks at your own pace.
+**Test yourself:**
+1. What is the difference between an RDD tuple and a DataFrame Row?
+2. Why does `printSchema()` on a Parquet file return instantly?
+3. What happens if you read a CSV without `inferSchema=True`?
+4. What does `withColumn` do if the column name already exists?
+5. Why is Parquet faster than CSV for analytical queries on wide tables?
+6. What does `explode` require as its input type?
-## Prerequisites
+---
-To make the most of these notebooks, you should have the following prerequisites:
+### Stage 4 — Spark SQL (Notebook 08)
-- Basic knowledge of Python programming.
+Covers: `createOrReplaceTempView`, `spark.sql()`, SQL vs DataFrame API
+performance, window functions (`rank`, `dense_rank`, `lag`, `lead`).
-- Understanding of data processing concepts (though no prior PySpark experience is required).
+
-## Usage
+**Test yourself:**
+1. How do you make a DataFrame queryable with SQL in Spark?
+2. What is the performance difference between Spark SQL and the DataFrame API?
+3. What does a window function like `rank()` do that `groupBy` cannot?
+4. What is the difference between `rank()` and `dense_rank()`?
-These notebooks are meant for self-learning and practice. Follow along with the [tutorial video](https://youtu.be/EB8lfdxpirM) to gain a deeper understanding of PySpark concepts. Experiment with the code, modify it and try additional exercises to solidify your skills.
+---
+
+### Stage 5 — Intermediate (beyond this repo)
+
+Topics to explore next:
+- Partitioning: `repartition()`, `coalesce()`, `spark.sql.shuffle.partitions`
+- Reading execution plans: `df.explain(True)`
+- Broadcast joins: `df.join(broadcast(small_df), "id")`
+- Databricks: pre-injected session, `display()` vs `show()`
+- Portfolio project: 5 business questions on the NYC TLC dataset using only PySpark
+
+---
+
+## Notebook descriptions
+
+| Notebook | Topic |
+|----------|-------|
+| 01-PySpark-Get-Started | Environment setup, SparkSession creation |
+| 02-Create-SparkContext | SparkContext and its relationship to SparkSession |
+| 03-Create-SparkSession | Session config options and builder pattern |
+| 04-RDD-Operations | RDD transformations, actions, lazy evaluation |
+| 05-DataFrame-Intro | DataFrame basics, schema, show, printSchema |
+| 06-DataFrame-from-various-data-source | Reading CSV, JSON, Parquet; write modes |
+| 07-DataFrame-Operations | select, filter, join, groupBy, withColumn, nulls |
+| 08-Spark-SQL | Temp views, Spark SQL, window functions |
+
+---
## Contributing
-If you'd like to contribute to this repository by adding more notebooks, improving documentation, or fixing issues, please feel free to fork the repository, make your changes, and submit a pull request. We welcome contributions from the community!
+Found an issue or want to add a notebook? PRs welcome.
+See [AGENTS.md](AGENTS.md) for AI-assisted teaching guidance.
## License
-This project is licensed under the [MIT License](LICENSE.md).
+MIT
diff --git a/assets/csv-vs-parquet.svg b/assets/csv-vs-parquet.svg
new file mode 100644
index 0000000..3867fac
--- /dev/null
+++ b/assets/csv-vs-parquet.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/assets/groupby-vs-window.svg b/assets/groupby-vs-window.svg
new file mode 100644
index 0000000..fa14f41
--- /dev/null
+++ b/assets/groupby-vs-window.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/assets/partition-vs-table.svg b/assets/partition-vs-table.svg
new file mode 100644
index 0000000..f72c25f
--- /dev/null
+++ b/assets/partition-vs-table.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/assets/rdd-vs-dataframe.svg b/assets/rdd-vs-dataframe.svg
new file mode 100644
index 0000000..534f29e
--- /dev/null
+++ b/assets/rdd-vs-dataframe.svg
@@ -0,0 +1 @@
+
\ No newline at end of file