Home / Projects
Published April 2026

Building a Cloud-Hosted
Fuel Price Pipeline

How I replaced a manual monthly process with a fully automated ELT pipeline using GitHub Actions, Neon PostgreSQL, and Python.

Python PostgreSQL GitHub Actions Neon
Type Personal Project
Timeline 8 weeks · 2026
Goal Practice Cloud Skills
Data source NSW Government FuelCheck

From manual process to automated pipeline

Every month, the NSW Government publishes a dataset through its FuelCheck program listing petrol prices across the state. It covers every major fuel type including E10, U91, P95, P98, diesel, and LPG, across more than a thousand stations. It is genuinely useful data. And for a while, I had been processing it manually.

The old process had a consistent set of problems. I had to check the NSW Government website each month to see if a new file was available, then download it and run a Python script locally. Errors required manual investigation with no structured log to reference. The database lived on my PC with no remote access, so nothing ran if I was not at my desk. And there was no data quality layer at all — a bad record either caused a crash or slipped through silently into the fact table.

⚠️

The core problem was not any single one of these issues — it was that they compounded. No logging meant errors were invisible. No remote access meant the pipeline was tied to one machine. No DQ layer meant bad data had nowhere to be caught. Fixing one without the others would not have solved much.

The goal

Rebuild the pipeline as a fully automated, cloud-hosted system with zero local dependencies, structured logging, and a proper data quality layer. Specifically:

Compute
GitHub Actions on a monthly CRON schedule — runs whether I am at my desk or not
Storage
Neon PostgreSQL hosted in the cloud — no local database, accessible from anywhere
Scripts
Python modules version-controlled in GitHub — no local files, fully reproducible
Quality
Automated DQ checks and autofix stored procedures — bad data is caught and corrected before it reaches production
🎯

The goal: a fully automated, cloud-hosted ELT pipeline with zero local dependencies, structured logging on every run, and automated data quality checks that catch and fix problems before they reach production.

Environment and components

The pipeline is made up of five distinct layers that work together: Python modules, API connections, stored procedures, the database, and logging.

Python modules handle orchestration, file retrieval, and data loading. They are thin by design. Each module is responsible for a single stage of the pipeline and hands off to the next via the orchestrator. Transformation and validation logic lives in the database, not in Python.

API connections are how the Python modules talk to the outside world. SQLAlchemy manages the connection to Neon PostgreSQL, handling query execution and data loading. The NSW Government open data portal is called once per run by the file retrieval module to download the latest monthly file. GitHub's API is used throughout the pipeline to commit log files and config updates back to the repository.

Stored procedures are where the meaningful logic lives. Because data is loaded into staging tables early, all quality checking, autofixing, and production promotion is written in SQL and executed as callable procedures from Python. This keeps the logic close to the data, version-controlled, and easy to test independently.

The database is a Neon PostgreSQL instance organised into three layers:

Logging operates at two levels. The orchestrator generates a timestamped log file for every run, capturing the start and end of each module, any skip events, and the full stderr output of any failure before the workflow exits. These files are committed to GitHub immediately on failure and at the end of every successful run. Inside the database, the sys_run_log table tracks stored procedure execution, recording the procedure name, the operation performed, and the number of rows affected. The retention policy automatically clears workflow logs in GitHub older than 90 days and deletes database records older than 24 months.

Architecture overview

The pipeline runs on a monthly CRON schedule defined in GitHub Actions, aligned to the NSW Government's release cycle. There is nothing saved locally. Compute runs in GitHub Actions, scripts are version-controlled in GitHub, and data lives in Neon PostgreSQL.

YAML
# pipeline.yml (simplified)
on:
  schedule:
    - cron: '0 6 2 * *'  # 6am on the 2nd of each month
  workflow_dispatch:       # manual trigger for testing

The key architectural decision was to push data into PostgreSQL as early as possible and do all meaningful transformation and validation inside the database. This is an ELT approach: raw data lands in staging tables first, and the logic lives in SQL from that point forward. The previous version processed everything in Python before writing to the database. Moving to ELT means the transformation logic is closer to the data, easier to test, and straightforward to extend.

ℹ️

This is an ELT pipeline, not ETL. The distinction matters: data lands in PostgreSQL first, and all transformation, quality checking, and promotion logic lives in SQL stored procedures. Python modules are thin orchestration layers only — they do not process data themselves.

The orchestrator

The orchestrator has two core functions: run_module() and push_file_to_repo().

run_module() executes each Python module as a subprocess, passes the shared log file path as an argument, and handles three possible outcomes: a clean exit, a graceful skip (return code 10, used when conditions are not met such as no new file being available), and a failure.

Python
def run_module(module_path):
    """Runs python files as a subprocess"""
    try:
        logger.info(f"Starting {module_path}")

        result = subprocess.run(
            ["python", module_path, "--log-file", log_file],
            check=False,
            capture_output=True,
            text=True,
            timeout=60
        )
        if result.returncode == 10:
            logger.info(f"Conditions not met in {module_path} - Skipping Module")
            return

        logger.info(f"Finished {module_path}")

        if result.returncode != 0:
            logger.error(f"Module {module_path} failed with exit code {result.returncode}")
            logger.error(f"{module_path} errors before failure:\n{result.stderr}")
            push_file_to_repo(log_file, f"Workflow log before failure in {module_path}")
            sys.exit(1)

    except Exception as e:
        logger.exception(f"Unexpected error running {module_path}: {e}")
        push_file_to_repo(log_file, f"Workflow log before failure in {module_path}")
        raise

If a module fails, the log is pushed to GitHub immediately before the workflow exits. This means there is always a retrievable record of what went wrong, even if the run never completes.

💡

Pushing the log to GitHub on failure before the workflow exits was one of the most useful decisions in the build. Without it, a failed run in a cloud environment leaves nothing behind to debug. With it, there is always a retrievable record regardless of where the pipeline stopped.

push_file_to_repo() authenticates using GITHUB_TOKEN from the Actions environment and commits the file directly to the repository:

Python
def push_file_to_repo(file_path, commit_message):
    """Adds, commits, and pushes a file to GitHub using GITHUB_TOKEN"""
    try:
        repo_url = (
            f"https://x-access-token:{os.environ['GITHUB_TOKEN']}"
            f"@github.com/{os.environ['GITHUB_REPOSITORY']}.git"
        )
        subprocess.run(["git", "config", "user.name", "github-actions"], check=True)
        subprocess.run(["git", "config", "user.email", "[email protected]"], check=True)
        subprocess.run(["git", "add", file_path], check=True)
        subprocess.run(["git", "commit", "-m", commit_message], check=False)
        subprocess.run(["git", "push", repo_url, "HEAD:main"], check=True)
        logger.info(f"Successfully pushed {file_path} to repo")

    except subprocess.CalledProcessError as e:
        logger.exception(f"Failed to push {file_path}: {e}")
        raise

The ELT flow

The six modules run sequentially. The first two handle extraction and initial loading into staging; everything after that happens inside PostgreSQL via stored procedures.

  1. file_retrieval.py hits the NSW Government endpoint, downloads the monthly FuelCheck Excel file, converts it to CSV, and commits it to the repository.
  2. transform_data.py does light Python-side cleaning, expands the monthly snapshot into a complete daily price time series, and loads it into stg_fuel_price.
  3. api_integration.py identifies new stations, inactive stations, and stations with name or address changes, writing results to the relevant staging tables.
  4. data_quality.py calls two stored procedures in sequence: check_data_quality() to surface defects, then update_data_quality_autofix() to resolve what it can.
  5. data_update.py promotes validated staging data into the production fact and dimension tables.
  6. retention_policy.py runs last, deleting workflow logs older than 90 days from GitHub and fact data older than 24 months from the database.

Data quality: detect, log, autofix

Rather than halting on bad data or silently passing it through, every defect is categorised, logged to a dq_issues table, and where possible fixed automatically before anything touches production.

check_data_quality() runs a series of named checks. Each check has a defect code (e.g. MIS_01, INC_04) and writes results to dq_issues using INSERT ... ON CONFLICT DO UPDATE, so re-runs are idempotent. At the start of every run, all existing defects are marked inactive and any that are not re-detected are deleted at the end, leaving only live issues in the table.

Here is an example check. MIS_01 identifies price records in staging that cannot be matched to any known station in the dimension table or any of the new or updated station staging tables:

SQL
INSERT INTO dq_issues (dq_id, entity, key_var, key_var1, key_var2, attribute_name, attribute_value)
SELECT
    'MIS_01',
    'stg_fuel_price',
    f.servicestationname,
    f.address,
    'NA',
    NULL,
    NULL
FROM stg_fuel_price f
WHERE NOT EXISTS (
    SELECT 1 FROM dim_fuel_stations d
    WHERE UPPER(f.servicestationname) = UPPER(d.name)
    AND UPPER(f.address) = UPPER(d.address)
)
AND NOT EXISTS (
    SELECT 1 FROM stg_new_stations n
    WHERE UPPER(f.servicestationname) = UPPER(n.name)
    AND UPPER(f.address) = UPPER(n.address)
)
AND NOT EXISTS (
    SELECT 1 FROM stg_updated_stations u
    WHERE UPPER(f.servicestationname) = UPPER(u.name)
    AND UPPER(f.address) = UPPER(u.address)
)
GROUP BY f.servicestationname, f.address
ON CONFLICT (dq_id, entity, key_var, key_var1, key_var2)
DO UPDATE SET is_active = TRUE;

After all checks run, update_data_quality_autofix() steps through the dq_issues table and applies fixes where the resolution is deterministic. For example, INC_05 handles duplicate active station records by deactivating the oldest one:

SQL
WITH duplicates AS (
    SELECT
        dim.stationid,
        ROW_NUMBER() OVER (
            PARTITION BY UPPER(dim.name), UPPER(dim.address)
            ORDER BY dim.last_update ASC
        ) AS rn
    FROM dim_fuel_stations dim
    INNER JOIN dq_issues dq
        ON UPPER(dim.name) = dq.key_var
        AND UPPER(dim.address) = dq.key_var1
    WHERE dq.dq_id = 'INC_05'
    AND active = true
)
UPDATE dim_fuel_stations dim
SET active = FALSE
FROM duplicates dup
WHERE dim.stationid = dup.stationid
AND dup.rn = 1;

Every fix is logged to sys_run_log with a row count, giving a complete audit trail of what was corrected on each run.

The data model

The database follows a Kimball-style star schema, designed to be ready for direct use in Power BI. The central fact table holds price observations keyed to dimension tables for stations, fuel types, and dates. The staging-to-production promotion checks dq_issues before acting, so production tables only ever receive data that has passed or been successfully autofixed.

Documentation

Every Python module and stored procedure in this project is documented using a consistent spec format. The headings are the same across every component:

Good documentation on a solo project can feel optional. It is not. The spec format forces you to think through a module's boundaries before you build it, which catches design problems early. It also means coming back to a module three months later does not require reverse-engineering your own code to remember what it does.

Reflections

The most valuable decision in this project was committing to ELT from the start. Keeping transformation and validation logic in SQL stored procedures — rather than Python — meant that the code lived close to the data, was easy to test in isolation, and did not need to be re-run through the full pipeline to verify a fix. That pattern held up well as the project grew.

The data quality layer took longer to build than the rest of the pipeline combined, but it was worth the investment. The dq_issues table and the autofix stored procedure mean that problems are surfaced explicitly and corrected in a controlled, auditable way rather than crashing the run or silently corrupting the output. That changes how confident you can be in the data downstream.

If I were starting again, I would build the documentation spec before writing the first module, not after. Having to retrofit the spec format to existing modules meant some of the early thinking about dependencies and conditional checks was done backwards. Writing the spec first forces the right questions earlier.

The pipeline is running and the foundation is solid. The next step is connecting it to a Power BI dashboard for exploring fuel price trends by fuel type, region, station, and time — the star schema was designed with this in mind from the start.


Questions about this project or the pipeline design? Get in touch.

↑ Back to top