Postgresql logical replication pglogical - ghdrako/doc_snipets GitHub Wiki

Debuging pglogical

Logical replication is a method that we can use to replicate data based on the concept of identity replication. REPLICA IDENTITY is a parameter present in table management commands (such as CREATE TABLE and ALTER TABLE); this parameter is used by PostgreSQL to obtain additional information within WAL segments, to recognize which tuples have been eliminated and which tuples have been updated. The REPLICA IDENTITY parameter can take four values:

  • DEFAULT
  • USING INDEX index_name
  • FULL
  • NOTHING The concept behind logical replication is to pass the logic of the commands executed on the pri- mary machine to the server and not the exact copy of the blocks to be replicated, byte by byte. At the heart of logical replication, there is a reverse engineering process that, starting from the WAL segments and using a logical decoding process, is able to extrapolate the original SQL commands and pass them on to the replication machine, using a logical decoding process.

Logical replication is based on the concept that WAL segments, after being processed through a logical decoding process that reverses the physical information in a logical information, are made available through a publication mechanism. The primary will then start a publication process, and the replica will start a subscription process that, by connecting to the primary’s publication, is able to pass the decoded instructions directly to the query executor of the replica machine.

Create subscription option

pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval)

Parameters:

  • subscription_name - name of the subscription, must be unique
  • provider_dsn - connection string to a provider
  • replication_sets - array of replication sets to subscribe to, these must already exist, default is "{default,default_insert_only,ddl_sql}"
  • synchronize_structure - specifies if to synchronize structure from provider to the subscriber, default false
  • synchronize_data - specifies if to synchronize data from provider to the subscriber, default true.
  • forward_origins - array of origin names to forward, currently only supported values are empty array meaning don't forward any changes that didn't originate on provider node (this is useful for two-way replication between the nodes), or "{all}" which means replicate all changes no matter what is their origin, default is "{all}"
  • apply_delay - how much to delay replication, default is 0 seconds
  • force_text_transfer - force the provider to replicate all columns using a text representation (which is slower, but may be used to change the type of a replicated column on the subscriber), default is false

Tabela local_sync_status

  • sync_kind d - data s - structure
  • status
    • r (ready) - synchronizacja zakonczona
    • i (initializing) - synchronizacja inicjalizowana
    • d (data sync in progres) - synchronizacja danych w toku
    • s (structure sync in progres) - synchronizacja struktur w toku
    • f (sync failed) - synchronizacja nie powiodla sie

Logica replication lag

SELECT 
    slot_name,
    confirmed_flush_lsn, 
    pg_current_wal_lsn(), 
    (pg_current_wal_lsn() - confirmed_flush_lsn) AS lsn_distance
FROM pg_replication_slots;
SELECT slot_name,
       pg_size_pretty(pg_wal_lsn_diff(
          pg_current_wal_lsn(),
          restart_lsn
       )) AS bytes_behind,
       active,
       wal_status
FROM pg_replication_slots
WHERE wal_status <> 'lost'
ORDER BY restart_lsn;

Logical replication

Starting from 9.4, there is an extension called pglogical, and starting from version 10.x, the logical replica is native. In a logical replication, only data manipulation language (DML) operations are replicated, and data definition language (DDL) operations such as ALTER operation TABLE are not replicate.

Setup replication:

Check whether there is a connection between the two servers

 pg1:~$ ping pg2
 pg2:~$ ping pg1

Create the user on both servers - to manage logical replication.

CREATE USER replicarole WITH REPLICATION ENCRYPTED PASSWORD 'SuperSecret';
  • Master server – postgresql.conf
# Add settings for extensions here
listen_addresses = '*'
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
 # restart the master PostgreSQL server:
> systemctl restart postgresql
# check postgresql listening
> netstat -an | grep 5432
  • Replica server – postgresql.conf
# Add settings for extensions here
listen_addresses = '*'
wal_level = logical
max_logical_replication_workers = 4
max_worker_processes = 10
> systemctl restart postgresql
> netstat -an | grep 5432
  • Master server - pg_hba.conf file
# IPv4 local connections:
host all all 127.0.0.1/32 md5
host all replicarole 192.168.122.36/32 md
>systemctl reload postgresql  

Logical replication setup

  • Master server
db_source# create database db_source;
db_source=# \c db_source
db_source=# create table t1 (id integer not null primary key, name,varchar(64));
#  give the REPLICAROLE user SELECT permissions
db_source=# SELECT ON ALL TABLES IN SCHEMA public TO replicarole;
# create the publication on the master machine, where we are going to indicate the list of tables that we want # to replicate on the slave machine. We can also indicate all the tables, as in our example:
db_source=# CREATE PUBLICATION all_tables_pub FOR ALL TABLES;

# to replicate selected tables use:
db_source=#CREATE PUBLICATION pgbench FOR TABLE pgbench_accounts, pgbench_branches,
pgbench_tellers, pgbench_history;
  • Replica server:
postgres=# create database db_destination;
postgres=# \c db_destination
create table t1 (id integer not null primary key, name varchar(64));
db_destination=# select * from t1;

CREATE SUBSCRIPTION sub_all_tables CONNECTION
'user=replicarole password=SuperSecret host=pg1 port=5432
dbname=db_source' PUBLICATION all_tables_pub;
Monitoring logical replication
 db_source=# select * from pg_stat_replication ;	
 db_source=# select * from pg_publication;
 db_destination=# select * from pg_subscription;

SELECT slot.slot_name,
       slot.slot_type,
       slot.active,
       stat.application_name,
       stat.state, 
       stat.client_addr
FROM pg_replication_slots slot
JOIN pg_stat_replication stat 
ON (stat.pid = slot.active_pid);

Realign our replica server

Simplest way is to drop the subscription, truncate the table, and make the subscription again:

db_destination=# drop subscription sub_all_tables ;
db_destination=# truncate t1;
db_destination=# CREATE SUBSCRIPTION sub_all_tables CONNECTION
'user=replicarole password=SuperSecret host=pg1 port=5432 dbname=db_source'
PUBLICATION all_tables_pub;

Disabling logical replication

When that the master server becomes unreachable and we need to drop the subscription on the replica server.

The following command give error:

 db_destination=# drop subscription sub_all_tables ;

The correct steps that we have to execute are as follows:

db_destination=# alter subscription sub_all_tables disable;
ALTER SUBSCRIPTION
db_destination=# alter subscription sub_all_tables SET (slot_name = NONE);
ALTER SUBSCRIPTION
db_destination=# drop subscription sub_all_tables ;
DROP SUBSCRIPTION

These are the correct steps if we want to drop a subscription when the master server becomes unreachable. We can also use the ALTER SUBSCRIPTION sub_name DISABLE command to detach the subscription from the publication and the ALTER SUBSCRIPTION sub_name ENABLE command to re-attach the subscription to the publication.

Replication slots

If we are using a replication slot, a master can only recycle the transaction log once it has been consumed by all replicas. The advantage here is that a slave can never fall behind so much that a resync is needed.

The trouble is, what if we shut down a replica without telling the master about it? The master would keep a transaction log forever and the disk on the primary server would eventually fill up, causing unnecessary downtime.

To reduce this risk for the master, replication slots should only be used in conjunction with proper monitoring and alerting. It is simply necessary to keep an eye on open replication slots that could potentially cause issues or might not be in use anymore.

In PostgreSQL, there are two types of replication slot:

  • Physical replication slots
  • Logical replication slots
postgresql.conf:
wal_level = logical
max_replication_slots = 5 # or whatever number is needed

The max_replication_slots setting has to be changed if we are using PostgreSQL 9.6 or below. PostgreSQL 10.0 already has an improved default setting. Basically, just put in a number that serves our purpose. My recommendation is to add some spare slots so that we can easily attach more consumers without restarting the server along the way.

test=# \x
Expanded display is on.
test=# \df *create*physicalslot

test=# SELECT * FROM pg_create_physical_replication_slot('some_slot_name',true);
 The pg_create_physical_replication_slot function is here to help us to create the slot. It can be called with one of two parameters: if only a slot name is passed, the slot will be active when it is used for the first time. If true is passed as the second parameter, the slot will immediately start to conserve the transaction log

# Show which slots are active on the master
test=# \x
Expanded display is on.
test=# SELECT * FROM pg_replication_slots;

To make use of the slot, all we have to do is add it to the configuration file:

primary_slot_name = 'some_slot_name'

Once streaming is restarted, the slot will be used directly and protect replication.

#Create logical replication slot – using plugin test_decoding to extract from wal
SELECT * FROM pg_create_logical_replication_slot('logical_slot','test_decoding');

#Show change extracted to logical slot
SELECT pg_logical_slot_get_changes('logical_slot', NULL, NULL);
 #The format that's used here depends on the output plugin we chose previously.

 #the slot does not return data anymore once it is consumed:
test=# SELECT pg_logical_slot_get_changes('logical_slot', NULL, NULL);

The result set on the second call is therefore empty. If we want to fetch data repeatedly, PostgreSQL offers the pg_logical_slot_peek_changes function. It works just like the pg_logical_slot_get_changes function but assures that data will still be available in the slot.

Using plain SQL is, of course, not the only way to consume a transaction log. There is also a command-line tool called pg_recvlogical. It can be compared to doing tail -f on an entire database instance and receives the flow of data in real time. Let's start the pg_recvlogical command:

[hs@zenbook ~]$ pg_recvlogical -S logical_slot -P test_decoding
-d test -U postgres --start -f -

In this case, the tool connects to the test database and consumes data from logical_slot. -f means that the stream will be sent to stdout.

Solutions such as bi-directional replication (BDR) are totally based on logical decoding because changes at the binary level would not work with multi-master replication.

For version 10.0, the PostgreSQL community created two new commands: CREATE PUBLICATION and CREATE SUBSCRIPTION. These can be used for logical replication, which means that you can now selectively replicate data and achieve close-to-zero downtime upgrades.

Before getting started, the first thing to do is change wal_level to logical in postgresql.conf and restart:

wal_level = logical

Then, we can create a simple table:

test=# CREATE TABLE t_test (a int, b int);

To publish those changes, PostgreSQL offers the CREATE PUBLICATION command:

 test=# CREATE PUBLICATION pub1 FOR TABLE t_test;

Basically, creating a subscription directly is absolutely no problem. However, if we play this game inside the same instance from the test database to the repl database, it is necessary to create the replication slot in use manually. Otherwise, CREATE SUBSCRIPTION will never finish:

test=# SELECT pg_create_logical_replication_slot('sub1', 'pgoutput');

In this case, the name of the slot that's created on the master database is called sub1. Then, we need to connect to the target database and run the following command:

repl=# CREATE SUBSCRIPTION sub1
CONNECTION 'host=localhost dbname=test user=postgres'
PUBLICATION pub1
WITH (create_slot = false);

Note that create_slot = false is only used because the test is running inside the same database server instance. If we happen to use different databases, there is no need to manually create the slot and no need for create_slot = false.

pg_replication_slot_advance()

For a logical replication slot, this function starts a logical decode and skips any changes before the specified LSN. confirmed_flush position of this slot will be set to the value returned by the function. restart_lsn will also be moved forward if possible.

For a physical replication slot, this function updates restart_lsn.

pg_replication_slot_advance() doesn't run instantly, it has to actually read and process WAL records. Your pg_stat_activity query actually proves it, because wait_event is WALRead. According to the pg_replication_slots view, the current lsn of this slot is 18F/CFFFB08 and it is being advanced to the 199/65915E78. That requires to process 44GB of WAL files and of course it takes time.

Call pg_replication_slot_advance() from a thread (https://github.com/zalando/patroni/pull/2391[)](https://github.com/zalando/patroni/commit/4a854a71c087a315e3d8697fa824e9714e6db7b2) 4a854a7

On busy clusters with many logical replication slots the pg_replication_slot_advance () call affects the main HA loop and could result in the member key expiration. The only way to solve it is a dedicated thread response for moving slots forward.

The thread is started only when there are logical slots to be advanced. On busy clusters with many logical replication slots the pg_replication_slot_advance () call affects the main HA loop and could result in the member key expiration. The only way to solve it is a dedicated thread response for moving slots forward.

The thread is started only when there are logical slots to be advanced.

Replication monitoring

Natywna replikackja logiczna

https://dba.stackexchange.com/questions/224490/using-postgresql-logical-replication-how-do-you-know-that-the-subscriber-is-cau https://www.postgresql.org/docs/current/logical-replication-monitoring.html

Od strony source: select * from pg_stat_replication; Od stony destination: selet * from pg_stat_subscription Monitoring PostgreSQL Server, Postgres replication | Grafana Labs

On the publisher side there are few things you can check:

  • pg_catalog.pg_publication;
  • pg_catalog.pg_publication_tables;
  • pg_current_wal_lsn();

Interesting information is in the table pg_catalog.pg_stat_subscription. Here the important columns are:

  • received_lsn: Last write-ahead log location received .
  • last_msg_send_time: send time of last message received from the publisher.
  • last_msg_receipt_time: Receipt time of last message received from the publisher.
  • latest_end_lsn: Last write-ahead log location reported to the publisher.
  • latest_end_time: Time of last write-ahead log location reported to the publisher.

You have to check these columns to catch what is happening. First, check if the two databases are in sync;

Publisher side:

test_logical_replication=> select pg_current_wal_lsn();  
 pg_current_wal_lsn  
--------------------  
 0/8EB83768     

This shows the location in the WAL file where we are now, before starting a new insert.

Let's look how pg_catalog.pg_stat_subscription values change during replication on the subscriber:

test_logical_replication_subscriber=# select received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time from pg_catalog.pg_stat_subscription;

The difference in time in the columns last_msg_send_time and last_msg_receipt_time can give information about the lag between the publisher and subscriber.

Show lags on publisher:

On publisher(provider) side check output of this SQL(for version>=PG10) to get lags:

select   pid, client_addr, state, sync_state,  
         pg_wal_lsn_diff(sent_lsn, write_lsn) as write_lag,  
         pg_wal_lsn_diff(sent_lsn, flush_lsn) as flush_lag,  
         pg_wal_lsn_diff(sent_lsn, replay_lsn) as replay_lag
from pg_stat_replication;

PGLogical extention:

https://github.com/2ndQuadrant/pglogical#subscription-management

And if you're using pglogical extension, you can check replication status of your tables. If following query returns tables having problems (for exp. consistency problem) on subscriber side:

SELECT sub.sub_name, sync_kind, sync_relname, sync_status 
  FROM pglogical.local_sync_status stat 
  JOIN pglogical.subscription sub ON sub.sub_id = stat.sync_subid 
 WHERE sync_status!='r'

We can check on the subscriber that at this moment the two database are in sync, because the value returned by pg_current_wal_lsn() on the publisher matches the value in the columns received_lsn and latest_end_lsn on the subscriber:

test_logical_replication_subscriber=# select received_lsn, latest_end_lsn from pg_catalog.pg_stat_subscription;  

received_lsn    | latest_end_lsn  
----------------+------------------     
 0/8EB83768     | 0/8EB83768        

Cavits

Unlike Slony, Bucardo, or pglogical, native logical replication does not support sequences. We didn't explain how to copy sequences in those recipes, but the functionality is provided. This means if we want to use PostgreSQL native logical replication to upgrade between major versions, we would need to manually dump the most recent sequence values from the origin node and import them in the subscriber.

\d commands describing table show info abaut replication this table

Keys required for UPDATE and DELETE Without defining key replikated table can only proces insert statement. Delete and update are not allowed.

Show slots

SELECT data FROM pg_logical_slot_get_changes('striim_slot', NULL, NULL, 'pretty-print', '1'); 
SELECT data FROM pg_logical_slot_peek_changes('striim_slot', NULL, NULL, 'pretty-print', '1');

-- Peak changes (does not consume changes)
SELECT pg_logical_slot_peek_binary_changes('test_slot_v1', NULL, NULL, 'publication_names', 'cdc', 'proto_version', '1');

-- Get changes (consumes changes)
SELECT pg_logical_slot_get_binary_changes('test_slot_v1', NULL, NULL, 'publication_names', 'cdc', 'proto_version', '1');

Declarative partitioning

List partitioning

DROP TABLE IF EXISTS part_tags cascade;
CREATE TABLE part_tags (
  pk INTEGER NOT NULL DEFAULT nextval('part_tags_pk_seq') ,
  level INTEGER NOT NULL DEFAULT 0,
  tag VARCHAR (255) NOT NULL,
  primary key (pk,level)
)
PARTITION BY LIST (level);

Note is that the field used to partition the data must be part of the primary key.

  1. Define the child tables
CREATE TABLE part_tags_level_0 PARTITION OF part_tags FOR VALUES IN
(0);
CREATE TABLE part_tags_level_1 PARTITION OF part_tags FOR VALUES IN
(1);
CREATE TABLE part_tags_level_2 PARTITION OF part_tags FOR VALUES IN
(2);
CREATE TABLE part_tags_level_3 PARTITION OF part_tags FOR VALUES IN
(3);
 CREATE INDEX part_tags_tag on part_tags using GIN (tag gin_trgm_ops);
\d part_tags;
\d part_tags_level_0;
select * from part_tags;
select * from part_tags_level_0;
select * from part_tags_level_1;

Range partitioning

DROP TABLE IF EXISTS part_tags cascade;

CREATE TABLE part_tags (
pk INTEGER NOT NULL DEFAULT nextval('part_tags_pk_seq'),
ins_date date not null default now()::date,
tag VARCHAR (255) NOT NULL,
level INTEGER NOT NULL DEFAULT 0,
primary key (pk,ins_date)
)
PARTITION BY RANGE (ins_date);

CREATE TABLE part_tags_date_01_2020 PARTITION OF part_tags FOR
VALUES FROM ('2020-01-01') TO ('2020-01-31');
CREATE TABLE part_tags_date_02_2020 PARTITION OF part_tags FOR
VALUES FROM ('2020-02-01') TO ('2020-02-28');
CREATE TABLE part_tags_date_03_2020 PARTITION OF part_tags FOR
VALUES FROM ('2020-03-01') TO ('2020-03-31');
CREATE TABLE part_tags_date_04_2020 PARTITION OF part_tags FOR
VALUES FROM ('2020-04-01') TO ('2020-04-30')
CREATE INDEX part_tags_tag on part_tags using GIN (tag gin_trgm_ops);

 \d part_tags;
 \d part_tags_date_01_2020;
 select * from part_tags;
 select * from part_tags_date_01_2020;
 select * from part_tags_date_02_2020;
 select * from part_tags_date_03_2020;

Attach a new partition

CREATE TABLE part_tags_date_05_2020 PARTITION OF part_tags FOR VALUES FROM ('2020-05-01') TO ('2020-05-30');

Detach an existing partition

 ALTER TABLE part_tags DETACH PARTITION part_tags_date_05_2020 ;

Attach an already existing table to the parent table

ALTER TABLE part_tags ATTACH PARTITION part_tags_already_exists FOR
VALUES FROM ('1970-01-01') TO ('2019-12-31');