Postgres Listen Notify - bcgov/common-service-showcase GitHub Wiki
Postgres Listen Notify
The Postgres database has the ability to support the publish and subscribe pattern which is useful for situations where you would need a job server. Note that this implementation is Postgres specific, meaning that more abstract libraries like Knex do not natively support this.
Prerequisites
Assuming you have a need for a job queue, you will want to ensure that you have a table that serves as the queue defined. Ideally, you will want to ensure that your application logic uses FOR UPDATE SKIP LOCKED
so that they can atomically acquire and claim a job and prevent other instances from getting the same task (and thus duplicating the same work).
Schema Changes
Once you have the logic in place to atomically enqueue and dequeue jobs, you will need to perform a database migration so that your queue table can trigger notifications. For example, in a simple enqueue/dequeue queue, you would ideally want a notification triggered when there is either an INSERT or UPDATE operation done on your queue table. This can be achieved with the following knex snippet for example:
.then(() => knex.schema.raw(`CREATE OR REPLACE FUNCTION queue.object_queue_notify()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('object_queue_channel', NULL);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER queue_object_queue_trigger
AFTER INSERT OR UPDATE ON queue.object_queue
FOR EACH ROW
EXECUTE PROCEDURE queue.object_queue_notify();
`))
The above defines a trigger and a function. The trigger, named queue_object_queue_trigger
will act immediately after an insert or update operation has occured on the object_queue
table and invoke the queue.object_queue_notify
function. The function does a simple pg_notify
call in order to send out a notification to the object_queue_channel
channel with a null payload. With the trigger and function in place, this allows any application which is listening to the object_queue_channel
to be notified as soon as there is any state change to the object_queue
table.
Application Listener
Knex Variant
In order to "subscribe" to the object_queue_channel
mentioned earlier, you will need to run the LISTEN object_queue_channel
SQL command. This will make the current database connection effectively subscribe to that named channel and wait for messages to appear. This can be achieved loosely with the following code assuming the use of a knex pool:
function queueListener() {
const interval = 10000;
const notifyId = setInterval(() => {
dataConnection.knex.client.acquireRawConnection().then(connection => {
connection.query('LISTEN object_queue_channel');
connection.on('notification', (msg) => {
log.debug('Notification Received', msg);
queueManager.doJob(); // Do whatever work is needed to process this job
});
connection.on('end', () => {
log.warn('Connection Ended');
setTimeout(queueListener, interval); // Attempt to reconnect and resubscribe
});
connection.on('error', (err) => {
log.warn('Connection Error', err);
setTimeout(queueListener, interval); // Attempt to reconnect and resubscribe
});
log.info('Listening for jobs');
clearInterval(notifyId); // Remove temporary retry interval as a connection is live here
});
}, interval);
}
Note that as knex does not support this Postgres specific feature, we need to unwrap and depend on the underlying pg driver library in order to achieve a proper event-driven listen. While it is possible to spin up a standalone connection directly with the pg library, you lose the ability to allow knex to manage all of the database connections in the pool. The above example where we acquireRawConnection()
allows us the best of both worlds by remaining in the knex pool, but still allows us to run what we need to out of band.
- https://github.com/knex/knex/issues/285#issuecomment-491615555
- https://github.com/knex/knex/issues/5164#issuecomment-1524908905
PG Variant
The following is an extremely bare-bones approach for listening to the same channel, but leveraging the baseline pg library instead of knex. We do not generally advocate using this pattern as the knex pool is much more versatile, but it is documented here for reference.
async function notify() {
await client.connect();
client.query('LISTEN object_queue_channel');
client.on('notification', (msg) => {
log.debug('Notification Received', msg);
queueManager.doJob(); // Do whatever work is needed to process this job
})
// Add 'end' and 'error' event handlers as appropriate
};
Errata
The publish and subscribe model offered by Postgres' Listen and Notify commands are very versatile, and solves many common job queue problems. However, due to the existing requirements to ensure that our database connections do not persist in the event they become read-only (likely due to pod shuffle or cluster failover), as well as needing to poll the database anyways, we opted to not pursue this further in the meantime for COMS as it does not offer too much query savings to begin with.
We do suspect that this pattern may have some consideration for use in CHES as a potential option to condense and simplify the architecture stack to just Postgres. However, more consideration will need to be done for CHES as its job queue is a more complicated priority queue, whereas the COMS example we have provided in this article encapsulates a simple non-prioritized queue.