Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions 01-PySpark-Get-Started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,63 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "fe9fd1c0-db30-47b1-bbe2-0b1cbd97a9e2",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set the PySpark environment variables\n",
"import os\n",
"os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
"os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
"os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
"os.environ['PYSPARK_PYTHON'] = 'python'"
"# COLAB SETUP \u2014 run this cell first every session\n",
"# WHY: Google Colab resets its filesystem and Python environment on every\n",
"# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
"# Python package so no SPARK_HOME or local Spark installation is needed.\n",
"# The original os.environ cell hardcoded a local Mac path that does not\n",
"# exist on Colab's VM and has been removed from all notebooks.\n",
"!pip install pyspark -q\n",
"\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder \\\n",
" .appName(\"01-PySpark-Get-Started\") \\\n",
" .getOrCreate()\n",
"\n",
"sc = spark.sparkContext\n",
"print(spark.version)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# DATA BOOTSTRAP \u2014 run once per session after setup cell\n",
"# WHY: Colab's filesystem resets each session. This downloads the NYC TLC\n",
"# Yellow Taxi dataset \u2014 a real-world financial dataset with 19 columns\n",
"# including fare_amount, tip_amount, payment_type, and datetime fields.\n",
"# It replaces the synthetic sample files from the original repo which\n",
"# were too small and too simple to demonstrate meaningful PySpark operations.\n",
"# Run this notebook first. All subsequent notebooks reference /content/data/.\n",
"import os, shutil, subprocess\n",
"\n",
"os.makedirs('/content/data', exist_ok=True)\n",
"\n",
"# Download NYC TLC Yellow Taxi Jan 2023 (~50MB, ~3M rows)\n",
"if not os.path.exists('/content/data/trips.parquet'):\n",
" os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
"\n",
"trips = spark.read.parquet('/content/data/trips.parquet')\n",
"\n",
"# CSV version for CSV exercises\n",
"trips.limit(10000).write.mode('overwrite').option('header', True).csv('/content/data/trips_csv')\n",
"\n",
"# JSON version for JSON exercises\n",
"trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
"\n",
"# Text version for word count / text exercises\n",
"with open('/content/data/data.txt', 'w') as f:\n",
" f.write('hello world\\nhello spark\\nspark is fast\\nworld of spark\\nhello hello world\\n')\n",
"\n",
"print(f'Dataset ready: {trips.count():,} rows, {len(trips.columns)} columns')\n",
"trips.printSchema()"
]
},
{
Expand All @@ -22,8 +68,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Import PySpark\n",
"from pyspark.sql import SparkSession"
"# Import PySpark\nfrom pyspark.sql import SparkSession"
]
},
{
Expand All @@ -43,10 +88,7 @@
}
],
"source": [
"# Create a SparkSession\n",
"spark = SparkSession.builder \\\n",
" .appName(\"PySpark-Get-Started\") \\\n",
" .getOrCreate()"
"# Create a SparkSession\nspark = SparkSession.builder \\\n .appName(\"PySpark-Get-Started\") \\\n .getOrCreate()"
]
},
{
Expand Down Expand Up @@ -78,10 +120,7 @@
}
],
"source": [
"# Test the setup\n",
"data = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35)]\n",
"df = spark.createDataFrame(data, [\"Name\", \"Age\"])\n",
"df.show()"
"# Test the setup\ndata = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35)]\ndf = spark.createDataFrame(data, [\"Name\", \"Age\"])\ndf.show()"
]
},
{
Expand All @@ -90,7 +129,9 @@
"id": "096c5a89-058c-488a-9d9e-146fdb6a44dd",
"metadata": {},
"outputs": [],
"source": []
"source": [
""
]
}
],
"metadata": {
Expand All @@ -114,4 +155,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
80 changes: 54 additions & 26 deletions 02-Create-SparkContext.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,55 @@
{
"cell_type": "code",
"execution_count": null,
"id": "9b42a8f0-9a63-461c-95dc-27847f5b0a40",
"metadata": {},
"outputs": [],
"source": [
"# Set the PySpark environment variables\n",
"import os\n",
"os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
"os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
"os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
"os.environ['PYSPARK_PYTHON'] = 'python'"
"# COLAB SETUP \u2014 run this cell first every session\n",
"# WHY: Google Colab resets its filesystem and Python environment on every\n",
"# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
"# Python package so no SPARK_HOME or local Spark installation is needed.\n",
"# The original os.environ cell hardcoded a local Mac path that does not\n",
"# exist on Colab's VM and has been removed from all notebooks.\n",
"!pip install pyspark -q\n",
"\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder \\\n",
" .appName(\"02-Create-SparkContext\") \\\n",
" .getOrCreate()\n",
"\n",
"sc = spark.sparkContext\n",
"print(spark.version)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# DATA PATH SETUP\n",
"# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
"# this cell clones the repo data and downloads the NYC TLC dataset.\n",
"# It is idempotent \u2014 safe to run multiple times.\n",
"import os, shutil, subprocess\n",
"\n",
"os.makedirs('/content/data', exist_ok=True)\n",
"\n",
"# Clone repo data files if not present\n",
"if not os.path.exists('/content/pyspark-tutorial'):\n",
" subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
" '/content/pyspark-tutorial'], check=True)\n",
"shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
"\n",
"# Download NYC TLC dataset if not present\n",
"if not os.path.exists('/content/data/trips.parquet'):\n",
" os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
" trips = spark.read.parquet('/content/data/trips.parquet')\n",
" trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
" trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
"\n",
"print('Data ready:', os.listdir('/content/data'))"
]
},
{
Expand All @@ -30,10 +69,7 @@
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext\n",
"\n",
"# Create a SparkContext object\n",
"sc = SparkContext(appName=\"MySparkApplication\")"
"from pyspark import SparkContext\n\n# Create a SparkContext object\nsc = SparkContext(appName=\"MySparkApplication\")"
]
},
{
Expand Down Expand Up @@ -82,8 +118,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Shut down the current active SparkContext\n",
"sc.stop()"
"# Shut down the current active SparkContext\nsc.stop()"
]
},
{
Expand All @@ -101,15 +136,7 @@
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"# Create a SparkSession\n",
"spark = SparkSession.builder \\\n",
" .appName(\"MySparkApplication\") \\\n",
" .getOrCreate()\n",
"\n",
"# Get the SparkContext from the SparkSession\n",
"sc = spark.sparkContext\n"
"from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder \\\n .appName(\"MySparkApplication\") \\\n .getOrCreate()\n\n# Get the SparkContext from the SparkSession\nsc = spark.sparkContext\n"
]
},
{
Expand Down Expand Up @@ -158,8 +185,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Shut down the current active SparkContext\n",
"sc.stop() #or spark.stop()"
"# Shut down the current active SparkContext\nsc.stop() #or spark.stop()"
]
},
{
Expand All @@ -168,7 +194,9 @@
"id": "10336d77-bc36-4101-8055-dd7a2496d4da",
"metadata": {},
"outputs": [],
"source": []
"source": [
""
]
}
],
"metadata": {
Expand All @@ -192,4 +220,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
74 changes: 53 additions & 21 deletions 03-Create-SparkSession.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,55 @@
{
"cell_type": "code",
"execution_count": null,
"id": "5f80ab89-af0e-4139-8f6a-0a382310f34c",
"metadata": {},
"outputs": [],
"source": [
"# Set the PySpark environment variables\n",
"import os\n",
"os.environ['SPARK_HOME'] = \"/Users/coder2j/Apps/Spark\"\n",
"os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'\n",
"os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'\n",
"os.environ['PYSPARK_PYTHON'] = 'python'"
"# COLAB SETUP \u2014 run this cell first every session\n",
"# WHY: Google Colab resets its filesystem and Python environment on every\n",
"# runtime restart. pip install pyspark bundles Spark binaries inside the\n",
"# Python package so no SPARK_HOME or local Spark installation is needed.\n",
"# The original os.environ cell hardcoded a local Mac path that does not\n",
"# exist on Colab's VM and has been removed from all notebooks.\n",
"!pip install pyspark -q\n",
"\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder \\\n",
" .appName(\"03-Create-SparkSession\") \\\n",
" .getOrCreate()\n",
"\n",
"sc = spark.sparkContext\n",
"print(spark.version)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# DATA PATH SETUP\n",
"# WHY: If running this notebook without first running 01-PySpark-Get-Started,\n",
"# this cell clones the repo data and downloads the NYC TLC dataset.\n",
"# It is idempotent \u2014 safe to run multiple times.\n",
"import os, shutil, subprocess\n",
"\n",
"os.makedirs('/content/data', exist_ok=True)\n",
"\n",
"# Clone repo data files if not present\n",
"if not os.path.exists('/content/pyspark-tutorial'):\n",
" subprocess.run(['git','clone','https://github.com/coder2j/pyspark-tutorial.git',\n",
" '/content/pyspark-tutorial'], check=True)\n",
"shutil.copytree('/content/pyspark-tutorial/data', '/content/data', dirs_exist_ok=True)\n",
"\n",
"# Download NYC TLC dataset if not present\n",
"if not os.path.exists('/content/data/trips.parquet'):\n",
" os.system('wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -O /content/data/trips.parquet')\n",
" trips = spark.read.parquet('/content/data/trips.parquet')\n",
" trips.limit(10000).write.mode('overwrite').option('header',True).csv('/content/data/trips_csv')\n",
" trips.limit(1000).write.mode('overwrite').json('/content/data/trips_json')\n",
"\n",
"print('Data ready:', os.listdir('/content/data'))"
]
},
{
Expand Down Expand Up @@ -40,14 +79,7 @@
}
],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"# Create a SparkSession\n",
"spark = SparkSession.builder \\\n",
" .appName(\"MySparkApplication\") \\\n",
" .config(\"spark.executor.memory\", \"2g\") \\\n",
" .config(\"spark.sql.shuffle.partitions\", \"4\") \\\n",
" .getOrCreate()"
"from pyspark.sql import SparkSession\n\n# Create a SparkSession\nspark = SparkSession.builder \\\n .appName(\"MySparkApplication\") \\\n .config(\"spark.executor.memory\", \"2g\") \\\n .config(\"spark.sql.shuffle.partitions\", \"4\") \\\n .getOrCreate()"
]
},
{
Expand Down Expand Up @@ -91,8 +123,7 @@
}
],
"source": [
"# Perform operations using the SparkSession\n",
"spark"
"# Perform operations using the SparkSession\nspark"
]
},
{
Expand All @@ -102,15 +133,16 @@
"metadata": {},
"outputs": [],
"source": [
"# Shut down the current active SparkSession\n",
"spark.stop()"
"# Shut down the current active SparkSession\nspark.stop()"
]
},
{
"cell_type": "markdown",
"id": "f11e547f-54bd-43ee-b051-6722802bd567",
"metadata": {},
"source": []
"source": [
""
]
}
],
"metadata": {
Expand All @@ -134,4 +166,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
Loading