Data Engineering Project for Beginners | Airflow, API, GCP, BigQuery, Coder
🎥 Watch Youtube tutorial
💻 Use Github repo
Real case project to give you a hands-on experience in creating your own Airflow pipeline and grasping what Idempotency, Partitioning, and Backfilling are.
🧭 Plan:
Pull OpenWeather API data → Data in data lake as Parquet files on GCP platform → Staging to Production tables in Data Warehouse (BigQuery)
🏆 Run the pipeline with Airflow using Coder - an open-source cloud development environment you download and host in any cloud. It deploys in seconds and provisions the infrastructure, IDE, language, and tools you want. Used as the best practice in Palantir, Dropbox, Discord, and many more.
Absolutely FREE, a few clicks to launch, and super user-friendly.
Let’s set up the things:
PART 1
First, make sure your Docker is running. https://docs.docker.com/desktop/install/mac-install/
Then open your terminal and run the command to install Coder
curl -L https://coder.com/install.sh | sh
next start coder with the command
coder server
Open browser and navigate to http://localhost:3000 → Create your user
💣 Boom, the platform is up and running!
Now Click Templates → Starter Templates → pick Docker containers
After it's provisioned let’s edit it a little: Dockerfile → Edit files → Add these lines:
python3 \
python3-pip \
main.tf → Edit files → Add these after terraform block:
(or copy from https://registry.coder.com/modules/apache-airflow)
module "airflow" {
source = "registry.coder.com/modules/apache-airflow/coder"
version = "1.0.13"
agent_id = coder_agent.main.id
}
Click build and Publish
Now let’s create a workspace from the template:
click Workspaces → Create → Choose your newly built template → Click Airflow button → Create user → Tada 🎉
Now your Airflow instance ready & steady 🏎️
PART 2
Set up Connection to Google Cloud Platform - we’ll need a GCP Service Account (like credentials to access google platform programmatically):
- Create GCP account (it has free credits for the newbies, so don’t worry about the cost https://cloud.google.com/free/docs/free-cloud-features);
- Console Access: Go to the GCP Console, navigate to the IAM & Admin section, and select Service Accounts.
- Create Service Account: Click on "Create Service Account", provide a name, description, and click "Create".
- Grant Access: Assign the appropriate role Editor (just for the simplification)
- Create Key: Click on "Create Key", select JSON, and then "Create". This downloads a JSON key file. Keep this file secure, as it provides API access to your GCP resources.
- In Airflow Connections tab find “google_cloud_default” → under Keyfile JSON → insert WHOLE json file contents → Save
Set up variables
- In GCP create new project → Get the ID
- create account in OpenWeather API https://openweathermap.org/ → get API key
- In Airflow Variable tab create variables
weather-api-key = ‘API_KEY’
bq_data_warehouse_project = ‘your project ID’
gcs-bucket = ‘weather-tutorial’
PART 3
Create folder /dags and our first dag called data_ingestion.py
I like to start with writing the generic outline of the dag first, like:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'weather_data_ingestion',
default_args=default_args,
description='Fetch weather data and store in BigQuery',
schedule_interval='@daily',
)
Next let’s outline the steps you need your dag to perform:
fetch_weather_data_task >> gcs_to_bq_staging_task >> create_table_with_schema >> stg_to_prod_task
Next let’s define global variables we would like to use here, pull safely
# specifying global variable with CAPITAL letter as one of the best practices
API_KEY = Variable.get("weather-api-key")
GCS_BUCKET = Variable.get("gcs-bucket")
PROJECT_ID = Variable.get("bq_data_warehouse_project")
BQ_DATASET = "weather"
BQ_STAGING_DATASET = f"stg_{BQ_DATASET}"
TABLE_NAME = 'daily_data'
SQL_PATH = f"{os.path.abspath(os.path.dirname(__file__))}/sql/"
LAT = 40.7128 # Example: New York City latitude
LON = -74.0060 # Example: New York City longitude
okay let’s start with the first task fetch_weather_data_task
#it's Python operator, as we are going to create a function with pulling the fetch_weather_data_task = PythonOperator(
task_id='fetch_weather_data',
python_callable=fetch_weather_data,
dag=dag,
)
let’s define our function fetch_weather_data
We are going to save it into Parquet file format (you can do it in csv tho, just make things easier), as it’s one of the best practices:
Parquet stores data in a columnar format, each column is stored together. It’s better for compression, allows query engines to skip reading unnecessary data while processing queries, and optimized for Analytics Workloads
def fetch_weather_data(**context):
unix_timestamp, date = date_to_unix_timestamp()
url = f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={LAT}&lon={LON}&dt={unix_timestamp}&appid={API_KEY}"
# Make the request
response = requests.get(url)
data = response.json()["data"]
df = pd.DataFrame(data)
# Create an extra column, datetime non-unix timestamp format
df['datetime'] = date
# Save DataFrame to Parquet
filename = f"weather_data_{date}.parquet"
"""
Push the filename into Xcom - XCom (short for cross-communication) is a
mechanism that allows tasks to exchange messages or small amounts of data.
Variable have a function scope, but we need to use it in the next task
"""
context['ti'].xcom_push(key='filename', value=filename)
# Upload the file
gcs_hook = GCSHook() # it's using default GCP conection 'google_cloud_default'
gcs_hook.upload(bucket_name=GCS_BUCKET, object_name=filename, data=df.to_parquet(index=False))
we also need function date_to_unix_timestamp()
as API requires that, we can separate into a distinct function:
def date_to_unix_timestamp():
# Get the current date
date = datetime.now().date()
# Convert to a datetime object with time set to midnight
date_converted = datetime.combine(date, datetime.min.time())
# Convert to Unix timestamp (UTC time zone)
unix_timestamp = int(date_converted.replace(tzinfo=timezone.utc).timestamp())
return unix_timestamp, date
Now let’s assume we pulled the data into Google Cloud Storage, let’s go to the next task :gcs_to_bq_staging_task
Where we push our data from data lake into data warehouse. we are going to do it in 2 steps:
first, load it to the staging area and then we’ll write a sql script, which upserts data into the production data warehouse table.
By upserting I mean the practice of inserting the rows that are not present in the target table and updating with new values that already exist.
this time we don’t need PythonOperator, as we can use pre-built operators from apache-airflow-providers-google package, it’s easier and more convenient:
gcs_to_bq_staging_task = GCSToBigQueryOperator(
task_id="gcs_to_bigquery",
bucket=GCS_BUCKET,
source_objects=["{{ti.xcom_pull(key='filename')}}"], # pull filename from Xcom from the previous task
destination_project_dataset_table=f'{PROJECT_ID}.{BQ_STAGING_DATASET}.stg_{TABLE_NAME}',
create_disposition='CREATE_IF_NEEDED', # automatically creates table for us
write_disposition='WRITE_TRUNCATE', # automatically drops previously stored data in the table
time_partitioning={'type': 'DAY', 'field': 'datetime'}, # remember partitioning in the beginning? here it comes!
gcp_conn_id="google_cloud_default",
source_format='PARQUET',
dag=dag,
)
Next we are going to create a target table, with the logic create if not exists and explicitly stating the schema:
create_table_with_schema = BigQueryCreateEmptyTableOperator(
task_id='create_table_with_schema',
project_id=PROJECT_ID,
dataset_id=BQ_DATASET,
table_id=TABLE_NAME,
time_partitioning={'type': 'DAY', 'field': 'datetime'},
schema_fields=[
{"name": "dt", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "sunrise", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "sunset", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "temp", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "feels_like", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "pressure", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "humidity", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "dew_point", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "clouds", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "visibility", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "wind_speed", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "wind_deg", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "weather", "type": "RECORD", "mode": "NULLABLE", "fields": [
{"name": "list", "type": "RECORD", "mode": "REPEATED", "fields": [
{"name": "element", "type": "RECORD", "mode": "NULLABLE", "fields": [
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
{"name": "icon", "type": "STRING", "mode": "NULLABLE"},
{"name": "id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "main", "type": "STRING", "mode": "NULLABLE"}
]}
]}
]},
{"name": "datetime", "type": "DATE", "mode": "NULLABLE"}
],
dag=dag,
)
and lastly, we are creating stg_to_prod_task
which pulls data from staging and upserts it with BigQueryInsertJobOperator:
stg_to_prod_task = BigQueryInsertJobOperator(
task_id=f"upsert_staging_to_prod_task",
project_id=PROJECT_ID,
configuration={
"query": {
"query": open(f"{SQL_PATH}upsert_table.sql", 'r').read()
.replace('{project_id}', PROJECT_ID)
.replace('{bq_dataset}', BQ_DATASET)
.replace('{table_name}', TABLE_NAME),
# .replace('{partition_date}', date.today().isoformat()),
"useLegacySql": False
},
"createDisposition": "CREATE_IF_NEEDED",
"destinationTable": {
"project_id": PROJECT_ID,
"dataset_id": BQ_DATASET,
"table_id": TABLE_NAME
}
},
dag=dag
)
Let’s run our dag now! Should be all good and let’s double check that all the resources are in place - checking our data lake, data warehouse
In order for us to make this pipeline with the option of backfilling - mean populating for the previous periods, let’s just add these 2 tweaks:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'backfill_date': datetime.strptime('2024-03-02', '%Y-%m-%d').date()
}
def date_to_unix_timestamp(date):
if date is None:
# Get the current date
date = datetime.now().date()
# Convert to a datetime object with time set to midnight
date_converted = datetime.combine(date, datetime.min.time())
# Convert to Unix timestamp (UTC time zone)
unix_timestamp = int(date_converted.replace(tzinfo=timezone.utc).timestamp())
return unix_timestamp, date
jfyi, idempotence is a funky word that often hooks people. But it means if we run the pipeline repeatedly it will produce the same result.
To stop your project you can just click ‘Stop’ in Coder UI and to clean up Docker containers and images afterward.
In case you’ve been shutting off your Docker, just relaunch it, you can run
coder login <https://[YOUR_URL].try.coder.app/>
Here you have it, dears. Simple, yet helpful pipeline at your fingers and a whole easy-to-launch platform to play around with Airflow dags. Please tell me which topics you want me to cover next, and leave your comments below. Until then, stay curious!