Postgres
ribs_postgres is the PostgreSQL backend for ribs_sql. It wraps the
postgres Dart driver and exposes two
concrete Transactor implementations:
| Implementation | Connections | Use case |
|---|---|---|
PostgresTransactor | Single shared connection | Scripts, low-traffic services |
PostgresPoolTransactor | pg.Pool-managed pool | Production, concurrent workloads |
Both return Resource<Transactor>, guaranteeing the underlying connection or
pool is closed cleanly on success, error, or fiber cancellation.
The PostgreSQL wire protocol uses $1, $2, … positional placeholders, but
ribs_sql uses ? throughout. ribs_postgres rewrites ? to $N
automatically, so every Query, Update, and Fragment you write is
identical to the SQLite equivalent.
PostgresTransactor — single connection
PostgresTransactor.create opens one connection when the Resource is
acquired and reuses it for every transact() and stream() call. Transactions
are serialised over this single connection, which keeps the setup simple but
limits throughput under concurrent load.
// PostgresTransactor.create opens a single shared connection when the Resource
// is acquired and closes it on release. All transact() and stream() calls share
// this one connection sequentially. For concurrent workloads prefer the pool.
final Resource<Transactor> singleXa = PostgresTransactor.create(
pg.Endpoint(
host: 'localhost',
database: 'mydb',
username: 'app',
password: 's3cr3t',
),
);
// An optional ConnectionSettings controls SSL mode and other driver options.
final Resource<Transactor> singleXaTls = PostgresTransactor.create(
pg.Endpoint(host: 'db.example.com', database: 'prod'),
settings: const pg.ConnectionSettings(sslMode: pg.SslMode.require),
);
pg.Endpoint accepts host, port (default 5432), database, username,
and password. The optional settings parameter takes a
pg.ConnectionSettings for controlling SSL mode, timeouts, and other
connection-level options.
PostgresPoolTransactor — connection pool
PostgresPoolTransactor wraps pg.Pool from the postgres package. Each
transact() call borrows a session from the pool, runs the ConnectionIO
inside a database transaction, and returns the session automatically — even on
error or fiber cancellation.
From endpoints
withEndpoints accepts a list of pg.Endpoint objects. When more than one
endpoint is provided, the pool distributes new connections across them using
round-robin selection, which is useful for read replicas or multi-node setups.
pg.PoolSettings controls the pool size and SSL mode:
// PostgresPoolTransactor.withEndpoints creates a pg.Pool backed by one or more
// Endpoints. The pool distributes connections across endpoints using round-robin
// selection. Use PoolSettings to cap pool size, set timeouts, and configure SSL.
final Resource<Transactor> poolXa = PostgresPoolTransactor.withEndpoints(
[
pg.Endpoint(
host: 'localhost',
database: 'mydb',
username: 'app',
password: 's3cr3t',
),
],
settings: const pg.PoolSettings(
maxConnectionCount: 10,
sslMode: pg.SslMode.disable,
),
);
From a URL
withUrl accepts a standard PostgreSQL connection URL. This is convenient when
the connection string comes from an environment variable:
// withUrl accepts a standard PostgreSQL connection URL.
// Format: postgresql://[user:password@]host[:port][/database]
final Resource<Transactor> urlXa = PostgresPoolTransactor.withUrl(
'postgresql://app:s3cr3t@localhost:5432/mydb',
);
Placeholder conversion
? placeholders in SQL strings and Fragment.param calls are automatically
rewritten to $1, $2, … before the statement is sent to the driver.
This means Query, Update, ParameterizedQuery, and Fragment look exactly
the same as they do in the Overview examples:
// The postgres wire protocol uses $1, $2, ... positional placeholders.
// ribs_postgres rewrites the ? placeholders used throughout ribs_sql to the
// $N form automatically, so you write the same SQL regardless of the backend.
ConnectionIO<Option<(String, int)>> findUser(String name) =>
(Fragment.raw('SELECT name, age FROM person WHERE name = ') + Fragment.param(name, Put.string))
.query((Read.string, Read.integer).tupled)
.option();
Example program
The following snippet defines an Employee domain model and runs a complete
create-insert-query workflow against a pool transactor. All ConnectionIO
steps compose with flatMap and execute inside a single BEGIN/COMMIT
transaction:
// ── Domain ───────────────────────────────────────────────────────────────────
final class Employee {
final int? id;
final String name;
final String department;
const Employee({this.id, required this.name, required this.department});
}
final employeeRead = (Read.integer.optional(), Read.string, Read.string).tupled.map(
(t) => Employee(id: t.$1.toNullable(), name: t.$2, department: t.$3),
);
// ── Schema ───────────────────────────────────────────────────────────────────
ConnectionIO<int> createEmployeeTable() =>
'''
CREATE TABLE IF NOT EXISTS employee (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
department TEXT NOT NULL
)'''.update0.run();
// ── Writes ───────────────────────────────────────────────────────────────────
ConnectionIO<int> insertEmployee(String name, String dept) =>
'INSERT INTO employee (name, department) VALUES (?, ?)'
.update((Write.string, Write.string).tupled)
.run((name, dept));
// Insert ... RETURNING reads the generated id back in one round-trip.
ConnectionIO<int> insertEmployeeReturning(String name, String dept) =>
'INSERT INTO employee (name, department) VALUES (?, ?) RETURNING id'
.updateReturning(
(Write.string, Write.string).tupled,
Read.integer,
)
.run((name, dept));
// ── Queries ──────────────────────────────────────────────────────────────────
final byDept = ParameterizedQuery(
'SELECT id, name, department FROM employee WHERE department = ? ORDER BY name',
employeeRead,
Write.string,
);
ConnectionIO<IList<Employee>> employeesIn(String dept) => byDept.ilist(dept);
// ── Program ──────────────────────────────────────────────────────────────────
// Build the transactor from a URL and run the entire program inside a single
// Resource scope. The pool is closed automatically when use() completes.
IO<IList<Employee>> program() => PostgresPoolTransactor.withUrl(
'postgresql://app:s3cr3t@localhost:5432/mydb',
).use((xa) {
final setup = createEmployeeTable()
.flatMap((_) => insertEmployee('Alice', 'Engineering'))
.flatMap((_) => insertEmployee('Bob', 'Engineering'))
.flatMap((_) => insertEmployee('Carol', 'Product'));
return setup.flatMap((_) => employeesIn('Engineering')).transact(xa);
});
program() returns IO<IList<Employee>>. Nothing is executed until the IO
is run at the application boundary. The pool is opened and closed by the
Resource scope inside withUrl(...).use(...).