From manual process to automated pipeline
Every month, the NSW Government publishes a dataset through its FuelCheck program listing petrol prices across the state. It covers every major fuel type including E10, U91, P95, P98, diesel, and LPG, across more than a thousand stations. It is genuinely useful data. And for a while, I had been processing it manually.
The old process had a consistent set of problems. I had to check the NSW Government website each month to see if a new file was available, then download it and run a Python script locally. Errors required manual investigation with no structured log to reference. The database lived on my PC with no remote access, so nothing ran if I was not at my desk. And there was no data quality layer at all — a bad record either caused a crash or slipped through silently into the fact table.
The core problem was not any single one of these issues — it was that they compounded. No logging meant errors were invisible. No remote access meant the pipeline was tied to one machine. No DQ layer meant bad data had nowhere to be caught. Fixing one without the others would not have solved much.
The goal
Rebuild the pipeline as a fully automated, cloud-hosted system with zero local dependencies, structured logging, and a proper data quality layer. Specifically:
The goal: a fully automated, cloud-hosted ELT pipeline with zero local dependencies, structured logging on every run, and automated data quality checks that catch and fix problems before they reach production.
Environment and components
The pipeline is made up of five distinct layers that work together: Python modules, API connections, stored procedures, the database, and logging.
Python modules handle orchestration, file retrieval, and data loading. They are thin by design. Each module is responsible for a single stage of the pipeline and hands off to the next via the orchestrator. Transformation and validation logic lives in the database, not in Python.
API connections are how the Python modules talk to the outside world. SQLAlchemy manages the connection to Neon PostgreSQL, handling query execution and data loading. The NSW Government open data portal is called once per run by the file retrieval module to download the latest monthly file. GitHub's API is used throughout the pipeline to commit log files and config updates back to the repository.
Stored procedures are where the meaningful logic lives. Because data is loaded into staging tables early, all quality checking, autofixing, and production promotion is written in SQL and executed as callable procedures from Python. This keeps the logic close to the data, version-controlled, and easy to test independently.
The database is a Neon PostgreSQL instance organised into three layers:
- Staging tables — temporary working tables that hold raw and partially processed data for the current run:
stg_fuel_price,stg_new_stations,stg_inactive_stations, andstg_updated_stations - Fact table —
fact_fuel_prices, the central table holding daily price observations that have passed quality checks and been promoted from staging - Dimension tables —
dim_fuel_stations,dim_fuel_types, anddim_date, which provide the reference data that fact records join to for analysis
Logging operates at two levels. The orchestrator generates a timestamped log file for every run, capturing the start and end of each module, any skip events, and the full stderr output of any failure before the workflow exits. These files are committed to GitHub immediately on failure and at the end of every successful run. Inside the database, the sys_run_log table tracks stored procedure execution, recording the procedure name, the operation performed, and the number of rows affected. The retention policy automatically clears workflow logs in GitHub older than 90 days and deletes database records older than 24 months.
Architecture overview
The pipeline runs on a monthly CRON schedule defined in GitHub Actions, aligned to the NSW Government's release cycle. There is nothing saved locally. Compute runs in GitHub Actions, scripts are version-controlled in GitHub, and data lives in Neon PostgreSQL.
# pipeline.yml (simplified)
on:
schedule:
- cron: '0 6 2 * *' # 6am on the 2nd of each month
workflow_dispatch: # manual trigger for testing
The key architectural decision was to push data into PostgreSQL as early as possible and do all meaningful transformation and validation inside the database. This is an ELT approach: raw data lands in staging tables first, and the logic lives in SQL from that point forward. The previous version processed everything in Python before writing to the database. Moving to ELT means the transformation logic is closer to the data, easier to test, and straightforward to extend.
This is an ELT pipeline, not ETL. The distinction matters: data lands in PostgreSQL first, and all transformation, quality checking, and promotion logic lives in SQL stored procedures. Python modules are thin orchestration layers only — they do not process data themselves.
The orchestrator
The orchestrator has two core functions: run_module() and push_file_to_repo().
run_module() executes each Python module as a subprocess, passes the shared log file path as an
argument, and handles three possible outcomes: a clean exit, a graceful skip (return code 10, used when
conditions are not met such as no new file being available), and a failure.
def run_module(module_path):
"""Runs python files as a subprocess"""
try:
logger.info(f"Starting {module_path}")
result = subprocess.run(
["python", module_path, "--log-file", log_file],
check=False,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 10:
logger.info(f"Conditions not met in {module_path} - Skipping Module")
return
logger.info(f"Finished {module_path}")
if result.returncode != 0:
logger.error(f"Module {module_path} failed with exit code {result.returncode}")
logger.error(f"{module_path} errors before failure:\n{result.stderr}")
push_file_to_repo(log_file, f"Workflow log before failure in {module_path}")
sys.exit(1)
except Exception as e:
logger.exception(f"Unexpected error running {module_path}: {e}")
push_file_to_repo(log_file, f"Workflow log before failure in {module_path}")
raise
If a module fails, the log is pushed to GitHub immediately before the workflow exits. This means there is always a retrievable record of what went wrong, even if the run never completes.
Pushing the log to GitHub on failure before the workflow exits was one of the most useful decisions in the build. Without it, a failed run in a cloud environment leaves nothing behind to debug. With it, there is always a retrievable record regardless of where the pipeline stopped.
push_file_to_repo() authenticates using GITHUB_TOKEN from the Actions environment
and commits the file directly to the repository:
def push_file_to_repo(file_path, commit_message):
"""Adds, commits, and pushes a file to GitHub using GITHUB_TOKEN"""
try:
repo_url = (
f"https://x-access-token:{os.environ['GITHUB_TOKEN']}"
f"@github.com/{os.environ['GITHUB_REPOSITORY']}.git"
)
subprocess.run(["git", "config", "user.name", "github-actions"], check=True)
subprocess.run(["git", "config", "user.email", "[email protected]"], check=True)
subprocess.run(["git", "add", file_path], check=True)
subprocess.run(["git", "commit", "-m", commit_message], check=False)
subprocess.run(["git", "push", repo_url, "HEAD:main"], check=True)
logger.info(f"Successfully pushed {file_path} to repo")
except subprocess.CalledProcessError as e:
logger.exception(f"Failed to push {file_path}: {e}")
raise
The ELT flow
The six modules run sequentially. The first two handle extraction and initial loading into staging; everything after that happens inside PostgreSQL via stored procedures.
- file_retrieval.py hits the NSW Government endpoint, downloads the monthly FuelCheck Excel file, converts it to CSV, and commits it to the repository.
- transform_data.py does light Python-side cleaning, expands the monthly snapshot into a complete daily price time series, and loads it into
stg_fuel_price. - api_integration.py identifies new stations, inactive stations, and stations with name or address changes, writing results to the relevant staging tables.
- data_quality.py calls two stored procedures in sequence:
check_data_quality()to surface defects, thenupdate_data_quality_autofix()to resolve what it can. - data_update.py promotes validated staging data into the production fact and dimension tables.
- retention_policy.py runs last, deleting workflow logs older than 90 days from GitHub and fact data older than 24 months from the database.
Data quality: detect, log, autofix
Rather than halting on bad data or silently passing it through, every defect is categorised, logged to a
dq_issues table, and where possible fixed automatically before anything touches production.
check_data_quality() runs a series of named checks. Each check has a defect code (e.g.
MIS_01, INC_04) and writes results to dq_issues using
INSERT ... ON CONFLICT DO UPDATE, so re-runs are idempotent. At the start of every run, all
existing defects are marked inactive and any that are not re-detected are deleted at the end, leaving only
live issues in the table.
Here is an example check. MIS_01 identifies price records in staging that cannot be matched
to any known station in the dimension table or any of the new or updated station staging tables:
INSERT INTO dq_issues (dq_id, entity, key_var, key_var1, key_var2, attribute_name, attribute_value)
SELECT
'MIS_01',
'stg_fuel_price',
f.servicestationname,
f.address,
'NA',
NULL,
NULL
FROM stg_fuel_price f
WHERE NOT EXISTS (
SELECT 1 FROM dim_fuel_stations d
WHERE UPPER(f.servicestationname) = UPPER(d.name)
AND UPPER(f.address) = UPPER(d.address)
)
AND NOT EXISTS (
SELECT 1 FROM stg_new_stations n
WHERE UPPER(f.servicestationname) = UPPER(n.name)
AND UPPER(f.address) = UPPER(n.address)
)
AND NOT EXISTS (
SELECT 1 FROM stg_updated_stations u
WHERE UPPER(f.servicestationname) = UPPER(u.name)
AND UPPER(f.address) = UPPER(u.address)
)
GROUP BY f.servicestationname, f.address
ON CONFLICT (dq_id, entity, key_var, key_var1, key_var2)
DO UPDATE SET is_active = TRUE;
After all checks run, update_data_quality_autofix() steps through the dq_issues
table and applies fixes where the resolution is deterministic. For example, INC_05 handles
duplicate active station records by deactivating the oldest one:
WITH duplicates AS (
SELECT
dim.stationid,
ROW_NUMBER() OVER (
PARTITION BY UPPER(dim.name), UPPER(dim.address)
ORDER BY dim.last_update ASC
) AS rn
FROM dim_fuel_stations dim
INNER JOIN dq_issues dq
ON UPPER(dim.name) = dq.key_var
AND UPPER(dim.address) = dq.key_var1
WHERE dq.dq_id = 'INC_05'
AND active = true
)
UPDATE dim_fuel_stations dim
SET active = FALSE
FROM duplicates dup
WHERE dim.stationid = dup.stationid
AND dup.rn = 1;
Every fix is logged to sys_run_log with a row count, giving a complete audit trail of what was corrected on each run.
The data model
The database follows a Kimball-style star schema, designed to be ready for direct use in Power BI. The
central fact table holds price observations keyed to dimension tables for stations, fuel types, and dates.
The staging-to-production promotion checks dq_issues before acting, so production tables only
ever receive data that has passed or been successfully autofixed.
Documentation
Every Python module and stored procedure in this project is documented using a consistent spec format. The headings are the same across every component:
- Module overview — the purpose of the module in plain language
- Upstream dependencies — what this module relies on to run (other modules, files, environment variables)
- Downstream dependencies — what relies on this module having run successfully
- Inputs and outputs — exactly what the module consumes and produces, including file paths, database tables, and config values
- Logic and steps — execution broken into named blocks so the intent of each stage is clear without reading the code
- Conditional checks — situations where the module exits early or skips, and why
- Error handling and logging — how failures are captured, logged, and surfaced to the orchestrator
- Helper functions — reusable functions defined in the module and what they do
Good documentation on a solo project can feel optional. It is not. The spec format forces you to think through a module's boundaries before you build it, which catches design problems early. It also means coming back to a module three months later does not require reverse-engineering your own code to remember what it does.
Reflections
The most valuable decision in this project was committing to ELT from the start. Keeping transformation and validation logic in SQL stored procedures — rather than Python — meant that the code lived close to the data, was easy to test in isolation, and did not need to be re-run through the full pipeline to verify a fix. That pattern held up well as the project grew.
The data quality layer took longer to build than the rest of the pipeline combined, but it was worth the
investment. The dq_issues table and the autofix stored procedure mean that problems are
surfaced explicitly and corrected in a controlled, auditable way rather than crashing the run or
silently corrupting the output. That changes how confident you can be in the data downstream.
If I were starting again, I would build the documentation spec before writing the first module, not after. Having to retrofit the spec format to existing modules meant some of the early thinking about dependencies and conditional checks was done backwards. Writing the spec first forces the right questions earlier.
The pipeline is running and the foundation is solid. The next step is connecting it to a Power BI dashboard for exploring fuel price trends by fuel type, region, station, and time — the star schema was designed with this in mind from the start.
Questions about this project or the pipeline design? Get in touch.
↑ Back to top