left arrow Back to posts

Announcing functions

Eric Goldman
4 min read

Today we're shipping functions — custom code that runs directly in your sinks to filter, transform, and route database changes in single-digit microseconds.

Why functions

Sequin powers high-bandwidth, reliable, and consistent Postgres change data capture pipelines. In those pipelines, there are three fundamental operations you often need to perform on messages.

Filter: You want some, but not all, changes emitted from your database. You may want to apply specific exclusion criteria or explicit inclusion criteria - or both!

Transforms: The raw payload from your database is rarely the right shape. Perhaps you need to redact PII, reformat timestamps, or logically deduce new fields.

Route: You need to direct messages to specific Kafka topics, HTTP endpoints, or indexes based on their contents.

Applying this kind of logic as early in your pipeline as possible pays downstream efficiency - saving subsequent systems from processing unnecessary data.

However, capturing this business logic in your database using triggers, functions, and the outbox pattern is both complex and can add significant load to your database. Moving this logic downstream to separate services adds latency, overhead, and complexity to your architecture.

We considered various approaches—from interpolation to configuration-based rules. But complex data processing logic inevitably requires the full expressiveness of code. Rather than forcing you to learn different interfaces for each pipeline stage, we made a deliberate design choice: use the same "isomorphic" function primitive for filters, transforms, and routing. 

This unified approach means you learn one interface and apply it everywhere in your pipeline, while still having the full power of custom code when you need it.

Function examples

Here's what each type of function looks like in practice:

Filter functions return true or false to determine which messages to process:

def filter(_action, record, _changes, _metadata) do
   record["region"] in ["us-west", "us-east"]
end

This filter only processes data located in US West or East regions.

Transform functions reshape your data into the JSON object you want delivered to your destination:

def transform(_action, record, _changes, metadata) do
  %{
    "user_id" => record["id"],
    "email" => String.downcase(record["email"]),
    "timestamp" => metadata.commit_timestamp
  }
end

This transform creates a clean payload with normalized email and commit timestamp.

Routing functions dynamically determine where messages should be delivered:

def route(_action, _record, _changes, metadata) do
  %{
    "topic" => "#{metadata.table_schema}.#{metadata.table_name}"
  }
end

This routing function sends messages to Kafka topics named after their source table.

Why Mini Elixir

The syntax in the examples above is Elixir—which might not be an obvious choice for a CDC platform. Most teams haven't written Elixir, and there are more popular languages we could have chosen.

This choice was driven by performance requirements. Sequin is designed to provide the highest throughput, lowest latency CDC in the market. To deliver on this promise at the scale of 50,000 messages per second per sink, functions need to execute in single-digit microseconds..

We explored several approaches to meet these performance requirements while balancing our concerns around security and simplicity. Ultimately, a restricted AST interpreter of our host language (Elixir) delivers unmatched throughput.

Most teams haven't written Elixir. So we launched functions with a Copy for ChatGPT button to give LLMs all the context they need to quickly translate requirements (or functions written in the language of your choice) into working “Mini Elixir”. Early users report this eliminates the learning curve entirely. 

Building and testing

Of course, the functions you’ll write in Sequin will displace or build upon business logic in your code base. So we designed functions to easily fit into your development workflow and CI/CD process.

Navigate to the new Functions tab in the Sequin Console to write and test functions:

Sequin Console IDE for writing and testing functions.

Then commit your functions to version control via sequin.yaml to test functions end-to-end in your local environment and in CI/CD. It’s helpful to see everything in plain text:

functions:
  region_filter:
    type: filter
    code: |
      def filter(_action, record, _changes, _metadata) do
        record["region"] in ["us-west", "us-east"]
      end
      
  clean_transform:
    type: transform
    code: |
      def transform(_action, record, _changes, metadata) do
        %{
          "user_id" => record["id"],
          "email" => String.downcase(record["email"]),
          "timestamp" => metadata.commit_timestamp
        }
      end
      
  table_router:
    type: routing
    code: |
      def route(_action, _record, _changes, metadata) do
        %{
          "topic" => "#{metadata.table_schema}.#{metadata.table_name}"
        }
      end
      
sinks:
  - name: users_to_kafka
    table_name: users
    filter: region_filter
    transform: clean_transform
    routing: table_router
    destination:
      type: kafka
      bootstrap_servers: "${KAFKA_BOOTSTRAP_SERVERS}"
      topic: users

Get started

Functions are available on all sinks. To get started, run Sequin locally and read our reference guides for filter, transform, and routing functions.

If you have any questions - join our Discord Server or Slack Community and tell us what you’re building.