We're Sequin, a Postgres CDC tool to streams and queues like Kafka, SQS, HTTP endpoints, and more. Like any Postgres maxi, we tend to find ourselves down a Postgres rabbit hole a couple times per month. This is one of those times :)
Who doesn't like to go a little overboard with Postgres? Extensions like pgsql-http push what we think is possible (or permissible) to do in the database.
I wondered the other day if you could build a request-reply mechanism using plain Postgres components. Turns out, you can! Whether or not you should is left as a decision for the reader.
Along the way, I'll pull in a bunch of Postgres tools we rarely get a chance to use like unlogged tables, advisory locks, and procedures:
Pub-sub vs request vs request-reply
Postgres has support for pub-sub with its LISTEN/NOTIFY
feature. But pub-sub is limited. The sender doesn't know if any processes are around to handle its notification. And the sender can't receive a reply.
One layer up from the pub-sub pattern is the request pattern. In a request, the sender should raise if there are no handlers. This gives the sending process firmer guarantees that its request will be handled. You can also specify that only one handler should receive the request.
Finally, in the request-reply pattern, the sender blocks until it receives a response from the handler. This gives the sender the most guarantees, as it knows for sure whether or not the request was successfully handled. And, the sender can receive a response payload, which it might use to continue its operation.
Why doesn't Postgres have request or request-reply?
Before building these things in Postgres, it's worth asking why they don't come packaged with the database. The reasons are reasonable 😄
Pub-sub is a great fit for a database system. Publishing adds very little overhead to transactions, as it's a non-blocking broadcast.
Requests seem like they'd be a nice improvement to pub-sub. After all, what's the point of publishing if nothing is around to hear it?
The issue with requests is that you're coupling transactions to the lifecycle of an external service. If that service is down, your requests will fail, and so your transactions will grind to a halt.
So, the correct design pattern here is to treat NOTIFY
as an optimization, not as critical communication. If your service really needs to know about database changes, it should have some other primary way of detecting them. It can poll the database, for example. Then, you can layer NOTIFY
on top to reduce polling frequency and learn about changes instantly – a great optimization.
Request-reply is where things really get out of hand. If this is happening inside of a transaction, you're now blocking a process – a precious resource in Postgres – on a reply. What if the service takes 30 seconds to respond?
When might you use request/request-reply?
There are two situations where these patterns in Postgres aren't so crazy:
Service-to-service communication in a radically simple setup
Maybe you want to keep your stack as simple as possible. You don't want or need to add more infrastructure to do service-to-service communication.
With this approach, your app instances just need to connect to Postgres and they have everything they need to do their jobs and coordinate.
You're building a tool with a Postgres interface
Think of a tool like Supabase or the aforementioned pgsql-http, where there's a lot of application logic happening inside Postgres. The more logic that lives in Postgres instead of your application, the more you'll be tempted to reach for power features like this.
The code
I built out a proof-of-concept, which you can checkout on GitHub.
How it works
The sender
As you'll see, using a table is key to (1) overcoming limitations with payload size and (2) providing a channel for the requester to receive a reply.
The request_reply
table:
create type request_reply_state as enum ('sending', 'processing', 'replied');
create unlogged table request_reply (
id serial primary key,
channel text not null,
request text not null,
response text,
state request_reply_state not null default 'sending'
);
channel
is the channel to use in LISTEN/NOTIFY
. request
is the payload of the request. response
will be used for the payload of the response.
Finally, a great use case for unlogged tables! Unlogged tables in Postgres are far more efficient to write to, as they don't write to the WAL. But they're not crash-safe, which limits their use cases. For temporary data like our request-reply mechanism, they're a great fit.
After inserting into the request_reply
table, you can emit a NOTIFY
message to get a handler to respond. The NOTIFY
will broadcast on the channel
specified. The body of the NOTIFY
will be the id
for the request_reply
entry.
You might be tempted to insert into request_reply
, emit the NOTIFY
message, then await the response all in the same query/function call. However, you need to commit the row first so that it is visible to other sessions. After you commit, Postgres will send your NOTIFY
to the handlers. Then you can block, awaiting the response.
Later, in "See it in action", I describe how you can use a procedure to turn the operation into a one-liner.
So, you can use two function calls, request()
and then await_reply()
.
Before inserting and broadcasting, Postgres can check if anyone is listening. If not, Postgres should raise:
create or replace function request(p_channel text, p_request text)
returns int as $$
declare
v_id int;
begin
-- Check if anyone is listening on the channel
if not exists (select 1 from pg_stat_activity where wait_event_type = 'Client' and wait_event = 'ClientRead' and lower(query) like '%listen%' || lower(p_channel) || '%') then
raise exception 'No listeners on channel `%`', p_channel;
end if;
-- Insert the request and get the ID
insert into request_reply (channel, request) values (p_channel, p_request) returning id into v_id;
-- Notify listeners
-- Postgres sends after the commit completes
perform pg_notify(p_channel, v_id::text);
return v_id;
end;
$$ language plpgsql;
Then, the sender needs to block and await the reply. To do so, you can poll the table request_reply
while waiting, checking if the state
has transitioned to replied
. An optimization to that is to use advisory locks:
- Poll the table until the handler has picked up the message (
request_reply.state != 'sending'
). - Then, move from polling to trying to acquire an advisory lock. As you'll see, when the handler picks up the message, it will acquire a lock.
- Block until the handler releases the lock.
Again, this is an optimization that ensures the polling period is brief. Advisory locks release instantly, so the sender will be able to immediately see the reply when it's ready.
Here's await_reply()
, which accepts the id
of the request_reply
entry:
create or replace function await_reply(v_id int)
returns text as $$
declare
v_response text;
begin
-- Wait for the response
loop
-- Check if the state has changed from 'sending'
if exists (select 1 from request_reply where id = v_id and state != 'sending') then
-- Try to acquire the advisory lock
if pg_try_advisory_lock(v_id) then
-- Lock acquired, fetch the response and delete the row
delete from request_reply where id = v_id returning response into v_response;
-- Release the lock
perform pg_advisory_unlock(v_id);
return v_response;
end if;
end if;
-- Wait a bit before trying again
perform pg_sleep(0.1);
end loop;
end;
$$ language plpgsql;
When the handler releases the lock, the sender can continue. The sender runs a delete
query to remove the message from request_reply
and retrieve the payload.
The handler
In your application code, you can register a listener for the NOTIFY
broadcast. When it receives a message, it:
- Opens a transaction.
- Runs an
update ... returning
to retrieve the message, setting the state toprocessing
. It simultaneously acquires an advisory lock. - The handler can process the request.
- The handler runs a final
update
, setting theresponse
. - The handler releases the advisory lock.
Here's what the first update query looks like:
begin;
with available_message as (
select id, request
from request_reply
where id = $1 and state = 'sending'
order by id
for update skip locked
limit 1
)
update request_reply r
set state = 'processing'
from available_message am
where r.id = am.id
returning r.request, pg_try_advisory_lock(r.id) as lock_acquired;
The available_message
CTE selects the message for update skip locked
. This prevents other listeners from grabbing and processing the message.
The update
query acquires the advisory lock, which will block the sender until the response is ready.
After the handler processes the request, it can run the final update
and set the response
:
with updated as (
update request_reply
set state = 'replied', response = $2
where id = $1
returning id
)
select pg_advisory_unlock(id) from updated;
When the update is complete, you can unlock the advisory lock, which unblocks the sender so it can retrieve the response and return.
See it in action
I built a fun demo demonstrating a request-reply that generates vector embeddings for a table in the database. You could run a background job with pg_cron
that populates missing embeddings.
To make request()/await_reply()
work well for queries running directly in Postgres, you can use a procedure. A procedure lets you orchestrate function calls. Importantly, you can commit in the middle of a procedure. So, you can:
- Call
request()
. commit
, which makes the new entry inrequest_reply
available to all senders.await_reply()
.- Handle the reply.
First, create a table of dune_quotes
and insert some data into it:
create table if not exists dune_quotes (
id serial primary key,
quote text not null,
embedding vector(1536)
);
insert into dune_quotes (quote) values
('i must not fear. fear is the mind-killer. fear is the little-death that brings total obliteration. i will face my fear. i will permit it to pass over me and through me. and when it has gone past i will turn the inner eye to see its path. where the fear has gone there will be nothing. only i will remain.'),
('he who controls the spice controls the universe.'),
('the mystery of life isn''t a problem to solve, but a reality to experience.');
Next, create a procedure that will generate embeddings for dune_quotes
that have a null
embedding
:
create or replace procedure process_dune_quote_embeddings() as $$
declare
quote_record record;
request_id int;
embedding_json json;
embedding_array float[];
begin
-- select quotes without embeddings
for quote_record in select id, quote from dune_quotes where embedding is null loop
-- request embedding
request_id := request('embeddings', quote_record.quote);
commit;
-- wait for and retrieve the embedding
embedding_json := await_reply(request_id)::json;
-- parse the json response and convert to array
-- assuming the embedding is directly an array of numbers in the json
select array(select elem::float
from json_array_elements_text(embedding_json) as elem)
into embedding_array;
-- update the quote with the new embedding
update dune_quotes
set embedding = embedding_array::vector(1536)
where id = quote_record.id;
-- commit after each update to make it visible to other transactions
commit;
end loop;
end;
$$ language plpgsql;
The bones of this proof of concept are available on GitHub.
The neat part about this example is that we can do all this without needing an extension like pgsql-http. We can use native Postgres components and move all other logic over to our application.