Notes on PostgresSQL triggers - krickert/search-api GitHub Wiki

Here’s a PostgreSQL trigger that fires off an event whenever a row is updated in a table. The trigger will call a function that:

  1. Sends a REST API request
  2. Sends a gRPC message
  3. Publishes a Kafka message

I'll provide the PostgreSQL PL/pgSQL trigger function along with a Python script that listens for changes and dispatches the messages.


Step 1: Create a Table and Trigger in PostgreSQL

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title TEXT,
    content TEXT,
    updated_at TIMESTAMP DEFAULT now()
);

-- Create a table to store changes (optional, for logging)
CREATE TABLE document_changes (
    id SERIAL PRIMARY KEY,
    document_id INTEGER,
    changed_at TIMESTAMP DEFAULT now()
);

-- Create trigger function
CREATE OR REPLACE FUNCTION notify_change() RETURNS TRIGGER AS $$
DECLARE
    payload JSON;
BEGIN
    payload := json_build_object(
        'id', NEW.id,
        'updated_at', NEW.updated_at
    );

    -- Insert into a changes table (optional)
    INSERT INTO document_changes (document_id, changed_at) VALUES (NEW.id, now());

    -- Notify a listener process (e.g., Python script) via PostgreSQL's NOTIFY
    PERFORM pg_notify('document_update', payload::text);

    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create the trigger on UPDATE
CREATE TRIGGER document_update_trigger
AFTER UPDATE ON documents
FOR EACH ROW
EXECUTE FUNCTION notify_change();

Step 2: Create a Python Listener for Dispatching Events

This script listens for pg_notify messages and sends updates via:

  • REST API (HTTP POST)
  • gRPC (to a microservice)
  • Kafka (publishes to a topic)

Install Dependencies:

pip install psycopg2 grpcio confluent-kafka requests

Python Script (listener.py)

import psycopg2
import psycopg2.extensions
import json
import requests
import grpc
from confluent_kafka import Producer

# REST API endpoint
REST_URL = "http://localhost:5000/document_update"

# Kafka Producer Configuration
kafka_config = {"bootstrap.servers": "localhost:9092"}
producer = Producer(kafka_config)
KAFKA_TOPIC = "document_updates"

# gRPC Client (Assume we have a gRPC service)
import update_pb2
import update_pb2_grpc

channel = grpc.insecure_channel("localhost:50051")
grpc_client = update_pb2_grpc.DocumentUpdateStub(channel)

# Connect to PostgreSQL and listen for updates
conn = psycopg2.connect("dbname=mydb user=myuser password=mypass host=localhost")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute("LISTEN document_update;")
print("Listening for document updates...")

def send_rest_update(payload):
    """Send document update via REST API"""
    try:
        response = requests.post(REST_URL, json=payload)
        print("REST API Response:", response.status_code)
    except Exception as e:
        print("REST Error:", e)

def send_kafka_update(payload):
    """Send document update via Kafka"""
    try:
        producer.produce(KAFKA_TOPIC, key=str(payload["id"]), value=json.dumps(payload))
        producer.flush()
        print("Sent to Kafka:", payload)
    except Exception as e:
        print("Kafka Error:", e)

def send_grpc_update(payload):
    """Send document update via gRPC"""
    try:
        request = update_pb2.UpdateRequest(id=payload["id"], timestamp=str(payload["updated_at"]))
        response = grpc_client.NotifyUpdate(request)
        print("gRPC Response:", response)
    except Exception as e:
        print("gRPC Error:", e)

# Listen for PostgreSQL notifications
while True:
    conn.poll()
    while conn.notifies:
        notify = conn.notifies.pop(0)
        payload = json.loads(notify.payload)
        print("Received update:", payload)

        # Dispatch updates
        send_rest_update(payload)
        send_kafka_update(payload)
        send_grpc_update(payload)

Step 3: Set Up the gRPC Service (Optional)

If you want to handle gRPC updates, here’s an example gRPC service.

gRPC Proto File (update.proto)

syntax = "proto3";

service DocumentUpdate {
    rpc NotifyUpdate(UpdateRequest) returns (UpdateResponse);
}

message UpdateRequest {
    int32 id = 1;
    string timestamp = 2;
}

message UpdateResponse {
    string message = 1;
}

gRPC Server (grpc_server.py)

import grpc
from concurrent import futures
import update_pb2
import update_pb2_grpc

class UpdateService(update_pb2_grpc.DocumentUpdateServicer):
    def NotifyUpdate(self, request, context):
        print(f"Received gRPC update: ID {request.id}, Timestamp {request.timestamp}")
        return update_pb2.UpdateResponse(message="Update received!")

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    update_pb2_grpc.add_DocumentUpdateServicer_to_server(UpdateService(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    print("gRPC server running on port 50051...")
    server.wait_for_termination()

if __name__ == "__main__":
    serve()

Step 4: Run the Components

  1. Start PostgreSQL and Create the Table & Trigger

    psql -d mydb -U myuser -f create_trigger.sql
    
  2. Start the gRPC Server

    python grpc_server.py
    
  3. Start the Listener

    python listener.py
    
  4. Trigger an Update in PostgreSQL

    UPDATE documents SET content = 'New content' WHERE id = 1;
    

You should see:

  • A REST API request sent to http://localhost:5000/document_update
  • A Kafka message published to the document_updates topic
  • A gRPC notification received by the gRPC server

Conclusion

This setup ensures that any updates to the documents table trigger notifications via REST, gRPC, and Kafka. PostgreSQL's pg_notify efficiently pushes messages, and the listener script dispatches them to various services.