postgres logical replication decoding plugins wal2json - ghdrako/doc_snipets GitHub Wiki

creates a replication slot named test_slot using the wal2json plugin.

SELECT 'start' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

Consume changes

There are two ways to obtain the changes (JSON objects) from wal2json plugin:

  • calling functions via SQL or
  • pg_recvlogical.

Using SQL function

query to observe the messages that have been published to it:

SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', 'on');

Using pg_recvlogical

To test the plug-in run pg_recvlogical from the command-line. The command is available with an installation of PostgreSQL. Use the host and port from your deployment, and the database and slot name you created through the API,

PGSSLMODE=require pg_recvlogical -d <DATABASE NAME> -U repl -h <HOST> -p <PORT> --slot <SLOT NAME> --start -o pretty-print=1 -f -

wal2json has several limitations when used with PostgreSQL, primarily related to its processing of large transactions and the nature of logical decoding. Here's a breakdown:

  1. Large Transactions:
  • Memory Limit: wal2json, by default, processes entire transactions at once. If a transaction is larger than 1 GB (or the configured memory limit), it can lead to out-of-memory errors, according to PostgreSQL.
  • Chunking: To address this, wal2json offers the write-in-chunks option. However, this option results in a non-well-formed JSON output, requiring clients to handle parsing and stitching together the chunks.
  • Performance Degradation: Very large transactions can also cause wal2json performance to degrade significantly, with throughput decreasing as the number of changes processed increases, according to GitHub.
  1. Logical Decoding Restrictions:
  • Excluded Operations: Logical decoding, which wal2json relies on, has limitations regarding what it replicates, including schema/DDL changes, sequences, TRUNCATE statements, and Large Objects, according to IBM.
  • TOAST Columns: Logical decoding may not include unchanged TOAST columns (e.g., long text fields) for performance reasons.
  • Overhead: Enabling wal_level to logical to use wal2json increases the size of WAL files, potentially requiring more disk space.
  1. Other Considerations:
  • HA Switchover: During a controlled HA switchover, it's possible for replication events to be delivered multiple times. Downstream applications must be able to handle this.
  • Replication Slots: Creating logical replication slots and not having consumers consume the changes can lead to disk space exhaustion as PostgreSQL continues to collect WAL logs.
  • Monitoring: It's crucial to monitor replication slots to ensure they are not consuming excessive disk space and to check if there are active consumers.

format 1 writes the entire transaction to a single buffer, and that buffer has a 1 GB limit. This limitation can cause issues with large transactions, potentially leading to errors or slow performance.

Format versions: 1 vs 2

The wal2json plugin supports two different output format versions.

The default format version is 1, which produces a JSON object per transaction. All new and old tuples are available within this single JSON object. This format is useful when you need to process entire transactions as atomic units.

Format version 2 produces a JSON object per tuple (row), with optional JSON objects for the beginning and end of each transaction. This format is more granular and can be useful when you need to process changes on a row-by-row basis. Both formats support various options to include additional properties such as transaction timestamps, schema-qualified names, data types, and transaction IDs.

To use format version 2, you need to specify it explicitly:

SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2');
-- inserted 2 rows
INSERT INTO inventory (product_name, quantity) VALUES
    ('Widget D', 200),
    ('Gizmo E', 75);
SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', 'on', 'format-version', '2');
lsn    | xid  |                                                                                                                                                                  data
-----------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0/24F18D8 | 3078 | {"action":"B"}

 0/24F1940 | 3078 | {"action":"I","schema":"public","table":"inventory","columns":[{"name":"id","type":"integer","value":8},{"name":"product_name","type":"character varying(100)","value":"Widget D"},{"name":"quantity","type":"integer","value":200},{"name":"last_updated","type":"timestamp without time zone","value":"2024-07-30 10:27:45.428407"}]}
 
0/24F1A48 | 3078 | {"action":"I","schema":"public","table":"inventory","columns":[{"name":"id","type":"integer","value":9},{"name":"product_name","type":"character varying(100)","value":"Gizmo E"},{"name":"quantity","type":"integer","value":75},{"name":"last_updated","type":"timestamp without time zone","value":"2024-07-30 10:27:45.428407"}]}

 0/24F1B10 | 3078 | {"action":"C"}
(4 rows)

Version 2 is more granular and allows for processing changes on a row-by-row basis. B - begin C - commit I - insert In wersion 1 this will be single json per transaction contains both inserts

Version 1:

  • Outputs a single JSON object per transaction.
  • Includes all new and old tuples within that JSON object.
  • Useful when processing entire transactions as atomic units.

Version 2:

  • Outputs a JSON object for each tuple (row).
  • Includes optional JSON objects for the beginning and end of each transaction.
  • More granular, allowing for row-by-row processing of changes.

Version 2 can enable faster decoding and shorter peak slot size duration in some cases.

REPLICA IDENTITY

REPLICA IDENTITY is a table property that determines what information is written to the WAL when a row is updated or deleted. This property is crucial for wal2json when working with tables that don't have a primary key.

REPLICA IDENTITY has four possible settings:

  • DEFAULT: Only primary key columns are logged for UPDATE and DELETE operations.
  • USING INDEX: A specified index's columns are logged for UPDATE and DELETE operations.
  • FULL: All columns are logged for UPDATE and DELETE operations.
  • NOTHING: No information is logged for UPDATE and DELETE operations.

Tables use the DEFAULT setting by default. For tables without a primary key, this means no information is logged for updates and deletes.

CREATE TABLE products_no_pk (
    product_name VARCHAR(100),
    quantity INTEGER,
    price DECIMAL(10, 2)
);

INSERT INTO products_no_pk (product_name, quantity, price) VALUES ('Widget', 100, 19.99);
UPDATE products_no_pk SET quantity = 90 WHERE product_name = 'Widget';

The wal2json output for this update operation will not contain any information about the updated row due to the lack of a primary key and the DEFAULT REPLICA IDENTITY setting.

WARNING:  table "products_no_pk" without primary key or replica identity is nothing
    lsn    | xid  |        data
-----------+------+---------------------
 0/256D6C8 | 6151 | {                  +
           |      |         "change": [+
           |      |         ]          +
           |      | }
(1 row)

To capture changes for tables without a primary key, we can change the REPLICA IDENTITY to FULL:

ALTER TABLE products_no_pk REPLICA IDENTITY FULL;
UPDATE products_no_pk SET price = 21.99 WHERE product_name = 'Widget';

Now, the wal2json output will include both the old and new values for all columns, which can be used to identify the changed row.

SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', 'on');
lsn    | xid  |                                                data
-----------+------+-----------------------------------------------------------------------------------------------------
 0/256E228 | 6152 | {                                                                                                  +
           |      |         "change": [                                                                                +
           |      |         ]                                                                                          +
           |      | }
 0/256E310 | 6153 | {                                                                                                  +
           |      |         "change": [                                                                                +
           |      |                 {                                                                                  +
           |      |                         "kind": "update",                                                          +
           |      |                         "schema": "public",                                                        +
           |      |                         "table": "products_no_pk",                                                 +
           |      |                         "columnnames": ["product_name", "quantity", "price"],                      +
           |      |                         "columntypes": ["character varying(100)", "integer", "numeric(10,2)"],     +
           |      |                         "columnvalues": ["Widget", 90, 21.99],                                     +
           |      |                         "oldkeys": {                                                               +
           |      |                                 "keynames": ["product_name", "quantity", "price"],                 +
           |      |                                 "keytypes": ["character varying(100)", "integer", "numeric(10,2)"],+
           |      |                                 "keyvalues": ["Widget", 90, 19.99]                                 +
           |      |                         }                                                                          +
           |      |                 }                                                                                  +
           |      |         ]                                                                                          +
           |      | }
⚠️ **GitHub.com Fallback** ⚠️