Automatically decompose nested JSON in your data warehouse into normalized dbt models.
Forge Core is a deterministic BFS engine that reads a single JSON column (or multi-column table), discovers all nested structures, and generates:
- dbt SQL models — one per nested object/array
- Rollup view — reassembles the full document from normalized tables
- schema.yml — structural column inventory
- JSON Schema — standard
draft-07schema of the discovered structure - Mermaid ER diagram — table relationship visualization
- dbt docs — browseable documentation site
Supported Warehouses
| Warehouse | Install Extra | Status |
|---|---|---|
| BigQuery | foxtrotcommunications-forge-core[bigquery] |
✅ Production |
| Snowflake | foxtrotcommunications-forge-core[snowflake] |
✅ Production |
| Databricks | foxtrotcommunications-forge-core[databricks] |
✅ Production |
| Redshift | foxtrotcommunications-forge-core[redshift] |
🚧 Beta |
Quickstart
pip install foxtrotcommunications-forge-core[bigquery] forge-core build \ --source-type bigquery \ --source-project my-gcp-project \ --source-database my_dataset \ --source-table my_json_table \ --target-dataset my_target
Or use the Python API:
from forge_core import build_core result = build_core( source_type="bigquery", source_project="my-gcp-project", source_database="my_dataset", source_table_name="my_json_table", target_dataset="my_target", ) print(f"Created {result.total_models_created} models") print(f"Processed {result.total_rows_processed} rows")
Enabling progress output
Forge Core uses Python's standard logging module. By default nothing is printed — add this before your build_core() call to stream progress to the console:
import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%H:%M:%S", ) logging.getLogger("forge_core").setLevel(logging.INFO)
This works in Jupyter notebooks, plain scripts, Airflow (routes through its own handler automatically), and any CI/CD environment that captures stdout.
How It Works
┌─────────────────────────────┐
│ Source Table (JSON column) │
└─────────────┬───────────────┘
│
▼
┌─────────────────────────────┐
│ 1. Root Model (frg) │ Parse JSON → root SELECT
└─────────────┬───────────────┘
│
▼
┌─────────────────────────────┐
│ 2. BFS Discovery Loop │ For each level:
│ - Discover keys │ • get_keys() → field names
│ - Infer types │ • get_types() → STRUCT/ARRAY/scalar
│ - Generate SQL model │ • create_file_in_models()
│ - dbt build │ • run_dbt_command()
│ - Tag as excluded │ • tag_models_as_excluded()
│ - Queue children │ • next_batch.extend()
└─────────────┬───────────────┘
│
▼
┌─────────────────────────────┐
│ 3. Rollup View │ JOIN all tables back into
│ (frg__rollup) │ nested STRUCT/ARRAY form
└─────────────┬───────────────┘
│
▼
┌─────────────────────────────┐
│ 4. Artifacts │ schema.yml, JSON Schema,
│ │ Mermaid diagram, dbt docs
└─────────────────────────────┘
Authentication
Forge Core uses standard warehouse authentication:
- BigQuery: Application Default Credentials (
gcloud auth application-default login) orGOOGLE_APPLICATION_CREDENTIALS - Snowflake:
SNOWFLAKE_ACCOUNT,SNOWFLAKE_USER,SNOWFLAKE_PRIVATE_KEY_PATH, etc. - Databricks:
DATABRICKS_SERVER_HOSTNAME,DATABRICKS_HTTP_PATH,DATABRICKS_ACCESS_TOKEN - Redshift:
REDSHIFT_HOST,REDSHIFT_USER,REDSHIFT_PASSWORD,REDSHIFT_DATABASE
Project Structure
After a build, your project directory looks like:
forge_project/
├── dbt_project.yml
├── profiles.yml # Auto-generated
├── macros/
│ └── incremental_tmp_table_dropper.sql
├── models/
│ ├── frg.sql # Root model
│ ├── frg__root__....sql # Unnested models (one per level)
│ ├── frg__rollup.sql # Rollup view
│ └── schema.yml # Column inventory
└── target/
├── schema.json # JSON Schema
├── schema.mmd # Mermaid diagram
└── index.html # dbt docs
Use in Airflow / Containers
# Airflow PythonOperator from forge_core import build_core def forge_task(**context): result = build_core( source_type="bigquery", source_project="my-project", source_database="raw", source_table_name="api_responses", target_dataset="normalized", project_dir="/tmp/forge_project", ) return result.total_models_created
Understanding the Generated Schema
Key Columns
Every table generated by Forge Core contains these system columns:
| Column | Type | Description |
|---|---|---|
ingestion_hash |
STRING | Hash of the source row. Groups all decomposed tables that came from the same original JSON document. |
idx |
STRING | Composite positional key. Encodes the exact path through nested arrays to reach this row. |
ingestion_timestamp |
TIMESTAMP | When the row was ingested. |
table_path |
STRING | Hierarchical path describing the nesting lineage (e.g., root__experiments__team). |
How idx Works
The idx column is a _-delimited string that grows one segment per nesting level:
Depth 0 (root): idx = "1"
Depth 1 (child): idx = "1_2" ← root row 1, child element 2
Depth 2 (grandchild): idx = "1_2_3" ← root row 1, child 2, grandchild 3
Depth 3 (great-grand): idx = "1_2_3_1" ← root row 1, child 2, grandchild 3, great-grandchild 1
Each segment represents the array position at that nesting level. This means:
- Every child row carries its full ancestry in
idx. - To find a child's parent, strip the last segment.
- To join parent ↔ child, match on the parent's depth offset.
Joining Parent to Child Tables
The rule: for each segment in the parent's idx, add one equality condition comparing that segment position in both parent and child. A parent at depth N has N segments — you expand N index conditions.
BigQuery
-- Depth 0 → 1: root (idx="1") → experiments (idx="1_2") -- Parent has 1 segment → 1 index condition SELECT r.*, e.experiment_name, e.experiment_status FROM `project.dataset.frg__root` r JOIN `project.dataset.frg__root__expe1` e ON r.ingestion_hash = e.ingestion_hash AND SPLIT(r.idx, '_')[OFFSET(0)] = SPLIT(e.idx, '_')[OFFSET(0)] -- Depth 1 → 2: experiments (idx="1_2") → team (idx="1_2_3") -- Parent has 2 segments → 2 index conditions SELECT e.*, t.team_name, t.team_role FROM `project.dataset.frg__root__expe1` e JOIN `project.dataset.frg__root__expe1__team1` t ON e.ingestion_hash = t.ingestion_hash AND SPLIT(e.idx, '_')[OFFSET(0)] = SPLIT(t.idx, '_')[OFFSET(0)] AND SPLIT(e.idx, '_')[OFFSET(1)] = SPLIT(t.idx, '_')[OFFSET(1)] -- Depth 2 → 3: team (idx="1_2_3") → lab_results (idx="1_2_3_1") -- Parent has 3 segments → 3 index conditions SELECT t.*, l.lab_name, l.result_value FROM `project.dataset.frg__root__expe1__team1` t JOIN `project.dataset.frg__root__expe1__team1__lab_1` l ON t.ingestion_hash = l.ingestion_hash AND SPLIT(t.idx, '_')[OFFSET(0)] = SPLIT(l.idx, '_')[OFFSET(0)] AND SPLIT(t.idx, '_')[OFFSET(1)] = SPLIT(l.idx, '_')[OFFSET(1)] AND SPLIT(t.idx, '_')[OFFSET(2)] = SPLIT(l.idx, '_')[OFFSET(2)] -- Three-level join: root → experiments → team SELECT r.patient_id, e.experiment_name, t.team_name FROM `project.dataset.frg__root` r JOIN `project.dataset.frg__root__expe1` e ON r.ingestion_hash = e.ingestion_hash AND SPLIT(r.idx, '_')[OFFSET(0)] = SPLIT(e.idx, '_')[OFFSET(0)] JOIN `project.dataset.frg__root__expe1__team1` t ON e.ingestion_hash = t.ingestion_hash AND SPLIT(e.idx, '_')[OFFSET(0)] = SPLIT(t.idx, '_')[OFFSET(0)] AND SPLIT(e.idx, '_')[OFFSET(1)] = SPLIT(t.idx, '_')[OFFSET(1)]
Snowflake
-- Depth 0 → 1: root → experiments (1 condition) SELECT r.*, e."experiment_name" FROM "DATASET"."FRG__ROOT" r JOIN "DATASET"."FRG__ROOT__EXPE1" e ON r."ingestion_hash" = e."ingestion_hash" AND SPLIT_PART(r."idx", '_', 1) = SPLIT_PART(e."idx", '_', 1) -- Depth 1 → 2: experiments → team (2 conditions) SELECT e.*, t."team_name" FROM "DATASET"."FRG__ROOT__EXPE1" e JOIN "DATASET"."FRG__ROOT__EXPE1__TEAM1" t ON e."ingestion_hash" = t."ingestion_hash" AND SPLIT_PART(e."idx", '_', 1) = SPLIT_PART(t."idx", '_', 1) AND SPLIT_PART(e."idx", '_', 2) = SPLIT_PART(t."idx", '_', 2)
General Join Formula
For a parent at depth N joining to a child at depth N+1, expand N index conditions — one per segment of the parent's idx:
parent.ingestion_hash = child.ingestion_hash
AND SPLIT(parent.idx, '_')[OFFSET(0)] = SPLIT(child.idx, '_')[OFFSET(0)]
AND SPLIT(parent.idx, '_')[OFFSET(1)] = SPLIT(child.idx, '_')[OFFSET(1)]
...
AND SPLIT(parent.idx, '_')[OFFSET(N-1)] = SPLIT(child.idx, '_')[OFFSET(N-1)]
The child always has one more segment than the parent — that final segment is the child's own position within the parent array.
Table Naming Convention
Table names encode the nesting path with truncated field names:
frg__root ← root extraction
frg__root__expe1 ← root.experiments (truncated to 4 chars + counter)
frg__root__expe1__team1 ← root.experiments[].team
frg__root__expe1__team1__lab_1 ← root.experiments[].team[].lab_results
frg__root__hosp1__staf1__nurs1 ← root.hospital[].staff[].nurses
The Rollup View
The frg__rollup view automatically reassembles all normalized tables back into nested STRUCT/ARRAY form — reconstructing the original JSON shape as queryable warehouse-native types. Use it when you want the full document without manual joins.
License
Apache 2.0
























