We're Sequin, a Postgres CDC tool to streams and queues like Kafka, SQS, HTTP endpoints, and more. Stable, sequential ordering is a very useful thing to have when building a streaming system. But Postgres' behavior here can be surprising! We go down the rabbit hole below.
An under-appreciated fact is that while MVCC (multi-version concurrency control) systems like Postgres are strongly consistent, they can appear to be eventually consistent from a client's perspective. What do I mean?
Let's say you have a field, inserted_at
, for some table:
create table access_logs (
id serial primary key,
user_name text,
inserted_at timestamp default now()
)
Your system only sets inserted_at
on insert, so it's effectively immutable.
You run a query like:
select * from access_logs
order by inserted_at desc
limit 3
It's possible that you execute that query and the first time receive a result set like this:
Result:
id | user_name | inserted_at ∇
---+-----------+------------------------
4 | Paul | 2024-08-19 10:00:00
2 | Irina | 2024-08-19 09:59:58
1 | Feyd | 2024-08-19 09:59:00
And then you query right afterwards and get a result like this:
Result:
id | user_name | inserted_at ∇
---+-----------+------------------------
4 | Paul | 2024-08-19 10:00:00
3 | Duncan | 2024-08-19 09:59:59
2 | Irina | 2024-08-19 09:59:58
Note how "Duncan" seemingly popped into the second position. Before, there were no records between "Irina" and "Paul." Now "Duncan" is between them. What's happening here?
A visual will help. Below, "Pid 1" and "Pid 2" are both inserting into the table at the same time. Pid 1 starts first, but for whatever reason, finishes after Pid 2:
The first query runs after Pid 2 commits, but before Pid 1 commits. The second query runs after both have committed.
If the inserted_at
field in Pid 1 was set to now()
before the inserted_at
field in Pid 2, you'll end up with this blip. Default values like now()
resolve at the beginning of queries, not at the end. Postgres resolves those values at the start. Then the rest of its business is checking constraints, writing to disk, updating indexes, and so forth.
To see this for yourself, you can run three commands in series like so:
select now();
insert into access_logs (user_name)
-- for example, generate a long string
select expensive_operation()
from generate_series(1, 5);
select now();
Even if the INSERT
takes many seconds to run, you'll notice that the inserted_at
for all rows is the same. And that inserted_at
is set to the timestamp near the beginning of the query.
How might Pid 2 "beat" Pid 1? It may be unlikely, but the point is it can. These operations are running on separate processes. There is no guarantee on order. And things like latency differences between the machine running the query on Pid 1 and the machine running the query on Pid 2 can have an effect.
What about sorting by the primary key, a SERIAL
type? Can't we just sort by id
to get a consistent view?
SERIAL
is a pseudo-type. It creates an integer column that automatically gets its value from a sequence. To better understand how sequences like SERIAL
work, I'll add a new column to access_logs
, seq
. That will make it easier to see what's going on:
create sequence access_logs_seq;
create table access_logs (
id serial primary key,
user_name text not null,
inserted_at timestamp default now(),
seq integer default nextval('access_logs_seq')
);
We get the next value for sequences by calling nextval()
.
Unlike now()
, nextval()
is called once for each insert operation. The flow looks like this:
- Generate
seq
for record 1 by callingnextval()
. - ...do the work to insert record 1.
- Generate
seq
for record 2 by callingnextval()
. - ...do the work to insert record 2.
- etc.
This is because now()
is marked as a stable
function, whereas nextval()
is not. Postgres only invokes stable
functions once for a given query or transaction:
begin;
select now();
-- => 2024-07-19 14:38:57.788156-07
-- sleep for 5 seconds
select pg_sleep(5);
select now();
-- => 2024-07-19 14:38:57.788156-07
commit;
The important point is that values for a row are set throughout the duration of a query or transaction in Postgres. They don't resolve right at the end on commit. Furthermore, the sequence function nextval()
isn't locked by a query or transaction after it's used.
This is all, of course, intentional. MVCC is a powerful paradigm that gives each process/transaction a ton of helpful guarantees, such as a consistent snapshot of the database. And it lets these processes proceed independently without blocking each other, which means highly concurrent throughput. This behavior might feel strange, but Postgres shuffling around the values of inserted_at
or seq
at the end of your transaction to order things would be even stranger.
This query volatility from the client's perspective is usually not a problem. However, there's one very notable and common situation where this causes big issues.
Cursor-based pagination
Cursor-based pagination is a strategy for processing all the rows in a table.
In cursor-based pagination, clients traverse a table with a cursor. For example, if your cursor is seq
, you might traverse a table like so:
select * from access_logs
where seq > ?
order by seq asc
limit 1000
When you receive the results, you use the max(seq)
as your next cursor.
However, due to query volatility at the tail, you risk skipping rows. In the access_logs
example above, the first query might return rows with seq
values 1
, 2
, and 4
. If you move the cursor to max(seq)
, you'll skip the row with seq=3
, which is still committing.
This is one reason so many APIs are eventually consistent. I've noticed eventual consistency at the tail of many APIs, most recently Salesforce, Stripe, and HubSpot. This is a huge bummer, as cursor-based pagination is the only way to extract tables from these APIs. (I'm assuming this behavior is pronounced with these APIs because they are doing inserts/updates inside of larger transactions.)
Mitigation
To mitigate pagination issues, one simple option is to simply "avoid the tail." If you're not paginating "near the end" of a result set, this problem is unlikely to happen.
However, this is not an option in systems that want to process records in real-time.
So your options, at a high-level, are either:
- Serialize your writes
- Add a filter to your reads
I dig in to both paths below.
Serializing writes is a relatively common mitigation, and what we explored first when we encountered this issue. It's reasonable, well-trodden territory.
Filtering your reads is far less common. Exploring the options and landing a decent solution here took me down a fun rabbit hole. In the end, I found something novel that works well.
Serializing writes with a sequence table
You can serialize your writes with a sequence table. Instead of using a Postgres sequence, you can create a new table that looks like this:
create table my_seq (
id UUID primary key,
seq BIGINT not null default 0
);
You could use one row in my_seq
for a whole table, or you could shard your sequence. For example, let's say you want strict ordering for your access_logs
table. But you only need strict ordering per account in your system. In that case, the id
in my_seq
will map to the access_logs.account_id
column.
You can use a trigger function + trigger to set access_logs.seq
on insert (assuming access_logs
is insert-only):
create or replace function next_seq()
returns trigger as $$
declare
i my_seq.seq%type;
begin
insert into my_seq (id, seq) values (new.account_id, 1)
on conflict (id) do update
set seq = my_seq.seq + 1
returning seq into i;
new.seq = i;
return new;
end;
$$ language plpgsql;
create trigger trigger_access_logs_next_seq
before insert on access_logs for each row execute procedure next_seq();
This eliminates race conditions. Now, when Pid 1 updates the row in my_seq
, it will lock the row. Pid 2 will need to wait until Pid 1 commits before it can lock the row, update it, and commit:
Of course Pid 2 could beat Pid 1 to locking my_seq
. But that doesn't matter–Pid 1 would just wait until Pid 2 was finished to start, turning the queries from parallel to serial.
Optimization 1: Mindful my_seq
locking
The main drawback of this approach is that your insert throughput takes a hit. Writes now funnel through updates to my_seq
, which must happen serially.
For us, this quickly became a dealbreaker. Write throughput was much lower than we needed, and the lock durations between concurrent writes was very high (seconds).
A significant optimization we found is to make sure you don't touch my_seq
until the last possible moment in your transaction. That means a flow like this:
- Open transaction.
- Insert rows.
- Update the rows we inserted, setting
seq
. - Commit transaction.
i.e.:
begin;
-- perform ~expensive batch insert
insert into access_logs
-- ...
-- perform much cheaper update
update access_logs
set seq = nextval('access_logs_seq')
where id = any($1);
commit;
To make this work, you'll drop your trigger functions, as you want to control when seq
is set.
This improved performance because our insert operations could contain large batches, and so took a non-trivial amount of time to run. Moving the seq
-setting to the very end ensures a transaction only locks my_seq
for the shortest amount of time possible.
Optimization 2: Put the seq
on a different table
The drawback of the prior optimization is that Postgres performs copy-on-write (vs update-in-place). So Postgres actually writes the rows to disk twice here: once on insert and again when updating seq
.
If your table has "heavy" rows (i.e. each row > ~100kb), you can optimize further by creating a new table which will hold your seq
:
create table access_logs_seqs (
id UUID primary key,
-- move seq from access_logs to this table
seq BIGINT not null
);
Now, to get the seq
for a transaction, join against access_logs_seqs
. When inserting access_logs
, do so in a transaction. Then after the insert, insert into access_logs_seq
using your counter:
- Open transaction.
- Insert into
access_logs
. - For each access_log, insert into
access_logs_seqs
. - Commit.
Again, I only recommend this complexity if rows tend to be heavy.
There's one more little optimization you can make, which is incrementing the counter in batches (e.g. by 100 if your batch size is 100) instead of 1-by-1, but I assume the gains are trivial.
Intelligently restrict your reads
(Or not so intelligently – I'll let you be the judge 😅)
Serializing your writes is usually the way to go. But, if you want some entertaining options to keep your writes concurrent yet read them safely, read on:
Earlier, I mentioned one mitigation strategy was to "avoid the tail" of a table. The blunt way to do this is to set a max/upper limit on your queries. For example, you can add a clause like:
where inserted_at < now() - interval '30 seconds'
But again, this is blunt, and may be undesirable.
The other route is to find some way to read your table up to the current consistent state.
Let's go back to the example at the beginning of this post. Ideally, we wouldn't let reads see Paul (inserted by Pid 2) until Pid 1 was finished and it was "safe" to do so. Is there a way to create such a precise upper bound?
I explored a couple solutions before landing on something novel. The journey is not for the faint of heart.
pg_snapshot_xmin
One option is to use pg_snapshot_xmin
, as outlined in this post. In Postgres, each transaction has an ID. These IDs increase in strict order. You can call this function:
select pg_snapshot_xmin(pg_current_snapshot())
To get the current minimum active transaction ID.
This means you can put a transaction_id
on your table:
alter table access_logs
add column transaction_id xid8
not null
default pg_current_xact_id();
Then, on inserts, the value of transaction_id
is set to the current transaction's ID via txid_current()
.
Then, on reads, you can treat transaction_id
as your ceiling like so:
select * from access_logs
where seq > {cursor}
and transaction_id < pg_snapshot_xmin(pg_current_snapshot())
This gets rid of your read races. In our example, while Pid 2's inserts will be visible, we'll skip them because Pid 1 is still running and its transaction ID is returned by pg_snapshot_xmin(pg_current_snapshot())
.
What's bad about this approach is that any long-running transaction in your system will set a ceiling. So, if you have a transaction running against another table for two minutes, your cursor/query will be unable to read the last two minutes of access_logs
. As such, this approach was a non-starter for me.
pg_stat_activity
Getting a little crazier, you could use pg_stat_activity
to achieve something similar, with a little more precision.
First, you'd add transaction_id
to your table, and set as above to txid_current()
on inserts/updates/as needed.
Then, you can "mark" the queries you care about with a leading comment like so:
-- tag:access_logs.insert
insert into access_logs
(...)
With that, this query will return the minimum transaction ID of any queries you care about that are currently touching that table:
with max_transaction_id as (
select min(backend_xmin) as txid from pg_stat_activity
where state = 'active'
and query like '-- tag:access_logs.insert%'
)
select * from access_logs
where seq > {cursor} and
transaction_id < (select txid from max_transaction_id);
Besides the aneurysms this will cause on the pgsql-hackers
mailing list, an issue with this approach is that pg_stat_activity.backend_xmin
is of type xid
, not the newer xid8
. Postgres "wraps around" (i.e. resets) xid
transaction IDs every couple billion transactions. (Unlike txid_current()
, the xid8
that pg_current_xact_id()
returns has an epoch, so is not subject to the wraparound.) Handling wrap arounds sounds like a good time. I'll leave dealing with that as an exercise for the reader/Claude Sonnet.
Another issue is the brittleness of using a comment with string matching, but if you're daring enough to take this route, that should be unconcerning to you.
I wasn't daring enough to take this route, and so pressed on.
Advisory locks
Many Postgres rabbit holes end up in advisory locks, and this time was no different. From the Postgres docs:
PostgreSQL provides a means for creating locks that have application-defined meanings. These are called advisory locks, because the system does not enforce their use — it is up to the application to use them correctly.
Advisory locks can be useful for locking strategies that are an awkward fit for the MVCC model.
Advisory locks are basically the one way you can affect observable state from inside a transaction for other sessions to read on the outside. Further, you can obtain a lock that's tied to the transaction, so that if the transaction closes (process dies), the lock is released.
These are the two key properties needed in a solution.
You can run this to obtain a transaction-level advisory lock:
begin;
-- must be a bigint
select pg_try_advisory_xact_lock(100);
And then, in other sessions, the lock will be visible on the pg_locks
table:
select objid from pg_locks where locktype = 'advisory';
=> 100
The bigint restriction is a little limiting. You want to be able to communicate two pieces of information:
- this lock relates to the
access_logs
insert shenanigans - here is a safe "ceiling" value you can use for your reads (e.g. "here is my
seq
")
There's a 2-arity version of this function that gets you closer:
select pg_try_advisory_xact_lock(classid int, objid int)
The first argument sets what's called the classid
. This lets you group advisory locks. In your application, you might define that classid=1
pertains to advisory locks relating to inserting into access_logs
.
The problem here is that both arguments in the 2-arity version are int
, not bigint
. So, you can't use seq
(a bigint
) for the objid
. And you can't use pg_current_xact_id()
– which returns an xid8
– either.
You can stack locks however. A transaction can hold multiple advisory locks. So, here's what I propose:
On inserts, grab two locks like so:
begin;
select pg_try_advisory_xact_lock_shared(1, 0);
select pg_try_advisory_xact_lock(
(select last_value + 1 from access_logs_seq)
);;
insert into access_logs
(...);
commit;
The first lock says "this session (pid
) is acquiring a lock related to inserting into access_logs
(classid=1
)." We use the _shared
version as we want to allow multiple sessions to acquire this lock.
The second lock says "the minimum seq
value that will be used by this session is select last_value + 1 from access_logs_seq
." You haven't called nextval()
yet, so that's not guaranteed to be the next value. But you know the next value will be greater than or equal to last_value + 1
.
If you were to select * from pg_locks
, you'd see:
Result:
pid | classid | objid
-----+---------+----------
1234 | 1 | 0
1234 | 0 | 89172
The first entry is the lock with classid
set, indicating this pid
is operating on access_logs
. The second contains objid
, which holds the value for seq
. (Notably, classid
is set to 0
when using the 1-arity lock function.)
That means you can get the minimum seq
for a running transaction by joining the two together:
-- l1 refers to the row with the `objid`
-- l2 refers to the row with the `classid`
select l1.objid from pg_locks l1
inner join pg_locks l2
on l1.pid = l2.pid
where l2.classid = 1
and l1.classid = 0
and l1.locktype = 'advisory';
So, to get the minimum seq
across all running transactions, you just need to use min
:
select min(l1.objid) from pg_locks l1
(...);
Nifty!
Pulling this all together, you can now read from access_logs
using a safe seq
ceiling like so:
with max_seq as (
select min(l1.objid) as seq
from pg_locks l1
inner join pg_locks l2 on l1.pid = l2.pid
where l2.classid = 1
and l1.classid = 0
and l1.locktype = 'advisory'
)
select *
from access_logs
where seq > $1
and (
(select seq from max_seq) is null
or seq < (select seq from max_seq)
);
Above, note the line that handles the situation where there is no locks (select seq from max_seq is null
).
You can, of course, use Postgres functions to encapsulate this logic. You can even make something generic that you can use across many sequences:
create or replace function register_sequence_transaction(sequence_name text)
returns void as $$
declare
seq_id oid;
next_val bigint;
begin
-- Get the OID for the given sequence
select oid into seq_id
from pg_class
where relname = sequence_name and relkind = 'S';
if seq_id is null then
raise exception 'Sequence % does not exist', sequence_name;
end if;
-- Get the last value of the sequence
select last_value into next_val
from pg_sequences
where sequencename = sequence_name;
-- Acquire a lock on this sequence
perform pg_try_advisory_xact_lock_shared(seq_id::int, 0);
-- Acquire a lock with the last value used
perform pg_try_advisory_xact_lock(next_val);
end;
$$ language plpgsql;
create or replace function max_safe_seq(sequence_name text)
returns bigint as $$
declare
seq_id oid;
max_seq bigint;
begin
-- Get the OID for the given sequence
select oid into seq_id
from pg_class
where relname = sequence_name and relkind = 'S';
if seq_id is null then
raise exception 'Sequence % does not exist', sequence_name;
end if;
-- Find the minimum seq across all running transactions
select min(l1.objid) into max_seq
from pg_locks l1
inner join pg_locks l2 on l1.pid = l2.pid
where l2.classid = seq_id::int
and l1.classid = 0
and l1.locktype = 'advisory';
-- If no locks are found, return the maximum possible bigint value
if max_seq is null then
return 9223372036854775807;
end if;
return max_seq;
end;
$$ language plpgsql;
Then you can use the function on inserts like this:
begin;
register_sequence_transaction('access_logs_seq');
insert into access_logs
(...);
commit;
And use is to filter reads like this:
select *
from access_logs
where seq > $1
and seq <= max_safe_seq('access_logs_seq');
Much more readable for future developers on the codebase.
With this approach, you leverage advisory locks for what they were intended for (areas where MVCC is an awkward fit). You can have uncapped, parallel writes to a table. And then set a ceiling on your reads that ensures clients won't perceive any eventual consistency.
How might Postgres make this easier?
Postgres provides plenty of ways to turn parallel writes into serial writes in order to avoid issues with MVCC when needed. The read side feels less supported, and the advisory locks solution took me a while to come up with.
If the 2-arity version of pg_try_advisory_xact_lock
accepted a bigint
as the second argument, that would have obviously made things easier. In that instance, I'd be able to store enough information in a single lock.
I also wonder if it makes sense to have something like max_safe_seq
supported first-class for sequences!