Ingest Postgres into your LakeHouse with LakeFlow
Get the configuration json to customize your pipelines.
Databricks Lakeflow Connect can sync multiple postgres databases to the Lakehouse. This post shows how to set up a single pipeline with multiple source databases and multiple target UC schemas, and specify compute size to minimize costs.
The first section explains how replication from Postgres works, and how to configure it. The second part show how to configure Lakeflow pipelines in your Databricks workspace to ingest from Postgres.
How PostgreSQL logical replication works
PostgreSQL writes every change (insert, update, delete) to the Write-Ahead Log (WAL) before applying it. Logical replication decodes that WAL stream into a human-readable format that consumers can subscribe to.
Three objects are involved:
1. WAL level
Logical replication requires wal_level = logical. On AWS RDS, set rds.logical_replication = 1 in your parameter group (requires a reboot).
2. Publication
A publication is a named filter over which tables to expose for replication. It lives inside a single database.
-- Replicate specific tables
CREATE PUBLICATION mfg_paloalto_cdc_pub FOR TABLE iot1.devices, iot1.alerts;
-- Or replicate all tables in the database
CREATE PUBLICATION mfg_paloalto_cdc_pub FOR ALL TABLES;One publication per database is typical. If you have multiple PostgreSQL databases on the same server, each needs its own publication.
3. Replication slot
A replication slot is a cursor into the WAL. PostgreSQL retains WAL segments until the slot consumer confirms it has processed them (confirmed_flush_lsn). This guarantees the consumer never misses a change, even if it disconnects temporarily.
SELECT pg_create_logical_replication_slot(’databricks_mfg_paloalto_slot’, ‘pgoutput’);Key facts about slots:
Slots are server-wide — the name must be unique across the entire PostgreSQL instance, not just within a database.
One slot per consumer — a slot can only be consumed by one connection at a time. If a second pipeline tries to use the same slot, it will be rejected.
Unacknowledged WAL accumulates — if a pipeline is stopped and not consuming, the slot holds back WAL. Monitor
pg_replication_slots.lagto avoid disk pressure.
Replication user
Create a dedicated user with replication privileges and read access to the tables being replicated:
CREATE USER databricks_replication WITH PASSWORD ‘...’ REPLICATION;
-- On RDS, also grant the rds_replication role
GRANT rds_replication TO databricks_replication;
GRANT CONNECT ON DATABASE mydb TO databricks_replication;
GRANT USAGE ON SCHEMA iot1 TO databricks_replication;
GRANT SELECT ON ALL TABLES IN SCHEMA iot1 TO databricks_replication;Replica identity
For CDC updates and deletes to include the old row values (needed to identify which row changed), each table needs a replica identity:
-- Default: uses the primary key (recommended when a PK exists)
ALTER TABLE iot1.devices REPLICA IDENTITY DEFAULT;
-- Full: includes all columns (required when there is no PK)
ALTER TABLE iot1.devices REPLICA IDENTITY FULL;One slot per consumer, not per catalog
The Databricks docs suggest creating one replication slot per source catalog. A more precise rule is: one replication slot per consumer (ingestion pipeline).
If you want to replicate the same database to two different workspaces, or run multiple pipelines in a test environment, each needs its own slot. Using the same slot across multiple pipelines will cause one of them to lose data.
Multiple databases on the same server
PostgreSQL publications are per-database, but replication slots are server-wide. If you have two databases (mfg_paloalto and mfg_austin) on the same server, create two replication slots with different names.
Server
├── Database: mfg_paloalto
│ └── Publication: mfg_paloalto_cdc_pub (per-database)
├── Database: mfg_austin
│ └── Publication: mfg_austin_cdc_pub (per-database)
│
├── Slot: databricks_mfg_paloalto_slot (server-wide, for mfg_paloalto DB)
└── Slot: databricks_mfg_austin_slot (server-wide, for mfg_austin DB)Databricks side
Three entities need to be created in order. Below shows the json sent using the Databricks CLI.
1. Unity Catalog connection
The connection is at the server level — no database name. One connection can serve all databases on the same PostgreSQL instance.
// connection.json
{
“name”: “my-postgres-connection”,
“connection_type”: “POSTGRESQL”,
“options”: {
“host”: “myinstance.abc123.us-east-1.rds.amazonaws.com”,
“port”: “5432”,
“user”: “databricks_replication”,
“password”: “<password>”
}
}databricks connections create --json connection.jsonA single connection can be used to ingest from multiple databases, as long as the user has permissions.
2. Gateway pipeline
The gateway pipeline runs continuously on a classic cluster. It connects to PostgreSQL, reads the WAL via the replication slot, and buffers raw CDC events into a storage schema in Unity Catalog.
// gateway.json
{
“name”: “my-postgres-gateway”,
“catalog”: “my_catalog”,
“schema”: “cdc_gateway_storage”,
“channel”: “CURRENT”,
“continuous”: true,
“gateway_definition”: {
“connection_name”: “my-postgres-connection”,
“gateway_storage_catalog”: “my_catalog”,
“gateway_storage_schema”: “cdc_gateway_storage”
}
“clusters”: [
{
“label”: “default”,
“driver_node_type_id”: “r5.xlarge”,
“node_type_id”: “r5.xlarge”,
“autoscale”: {
“min_workers”: 1,
“max_workers”: 5
}
}
]
}Setting min_workers = max_workers = 0 will give you a driver-only cluster, if your CDC stream is low volume.
databricks pipelines create --json gateway.jsonNote the pipeline_id returned — it is needed for the ingestion pipeline.
3. Ingestion pipeline
The ingestion pipeline reads buffered events from the gateway and writes them as Delta tables. It runs on serverless and is triggered (not continuous).
source_configurations maps each source database to its replication slot and publication. (Unfortunately the postgres source database is referred to as catalog.source_catalog which is a little confusing.) objects controls which schemas or tables to replicate and where they land.
The json below creates a single pipeline from two source schemas in different databases (mfg_paloalto and mfg_austin). Notice how the slot names correspond to the right database.
// ingestion.json
{
“name”: “my-postgres-ingestion”,
“catalog”: “my_catalog”,
“schema”: “mfg_ingest_metadata”,
“ingestion_definition”: {
“ingestion_gateway_id”: “<gateway-pipeline-id>”,
“source_type”: “POSTGRESQL”,
“connection_name”: null,
“objects”: [
{
“schema”: {
“source_catalog”: “mfg_paloalto”,
“source_schema”: “iot1”,
“destination_catalog”: “my_catalog”,
“destination_schema”: “mfg_paloalto_iot1”
}
},
{
“schema”: {
“source_catalog”: “mfg_austin”,
“source_schema”: “iot1”,
“destination_catalog”: “my_catalog”,
“destination_schema”: “mfg_austin_iot1”
}
}
],
“source_configurations”: [
{
“catalog”: {
“source_catalog”: “mfg_paloalto”,
“postgres”: {
“slot_config”: {
“slot_name”: “databricks_mfg_paloalto_slot”,
“publication_name”: “mfg_paloalto_cdc_pub”
}
}
}
},
{
“catalog”: {
“source_catalog”: “mfg_austin”,
“postgres”: {
“slot_config”: {
“slot_name”: “databricks_mfg_austin_slot”,
“publication_name”: “mfg_austin_cdc_pub”
}
}
}
}
]
}
}databricks pipelines create --json ingestion.jsonThe top level catalog.schema (my_catalog.mfg_ingest_metadata above) hold event logs and checkpoints for all the pipelines.
The above create command also returns a pipeline id ingestion-pipeline-id) which can be used to trigger the pipeline.
Triggering a run
After creation, trigger an initial snapshot + CDC run:
databricks pipelines start-update <ingestion-pipeline-id>The gateway pipeline should run continually. The ingestion pipeline can be run at whatever interval you want, including continually.
Summary checklist
Summary checklist
Step Scope Notes
────────────────────────────────────────────────────────────────────────────
wal_level = logical Server Requires reboot on RDS
Replication user Server One user can serve all databases
REPLICA IDENTITY Per table Set before creating the publication
Publication Per database One per database is typical
Replication slot Per database, server-wide One slot per ingestion pipeline
name
Network access Server Allow Databricks cluster egress IP on port 5432
