Skip to main content

Postgres

ribs_postgres is the PostgreSQL backend for ribs_sql. It wraps the postgres Dart driver and exposes two concrete Transactor implementations:

ImplementationConnectionsUse case
PostgresTransactorSingle shared connectionScripts, low-traffic services
PostgresPoolTransactorpg.Pool-managed poolProduction, concurrent workloads

Both return Resource<Transactor>, guaranteeing the underlying connection or pool is closed cleanly on success, error, or fiber cancellation.

Placeholder conversion

The PostgreSQL wire protocol uses $1, $2, … positional placeholders, but ribs_sql uses ? throughout. ribs_postgres rewrites ? to $N automatically, so every Query, Update, and Fragment you write is identical to the SQLite equivalent.

PostgresTransactor — single connection

PostgresTransactor.create opens one connection when the Resource is acquired and reuses it for every transact() and stream() call. Transactions are serialised over this single connection, which keeps the setup simple but limits throughput under concurrent load.

PostgresTransactor
// PostgresTransactor.create opens a single shared connection when the Resource
// is acquired and closes it on release. All transact() and stream() calls share
// this one connection sequentially. For concurrent workloads prefer the pool.
final Resource<Transactor> singleXa = PostgresTransactor.create(
pg.Endpoint(
host: 'localhost',
database: 'mydb',
username: 'app',
password: 's3cr3t',
),
);

// An optional ConnectionSettings controls SSL mode and other driver options.
final Resource<Transactor> singleXaTls = PostgresTransactor.create(
pg.Endpoint(host: 'db.example.com', database: 'prod'),
settings: const pg.ConnectionSettings(sslMode: pg.SslMode.require),
);

pg.Endpoint accepts host, port (default 5432), database, username, and password. The optional settings parameter takes a pg.ConnectionSettings for controlling SSL mode, timeouts, and other connection-level options.

PostgresPoolTransactor — connection pool

PostgresPoolTransactor wraps pg.Pool from the postgres package. Each transact() call borrows a session from the pool, runs the ConnectionIO inside a database transaction, and returns the session automatically — even on error or fiber cancellation.

From endpoints

withEndpoints accepts a list of pg.Endpoint objects. When more than one endpoint is provided, the pool distributes new connections across them using round-robin selection, which is useful for read replicas or multi-node setups. pg.PoolSettings controls the pool size and SSL mode:

PostgresPoolTransactor.withEndpoints
// PostgresPoolTransactor.withEndpoints creates a pg.Pool backed by one or more
// Endpoints. The pool distributes connections across endpoints using round-robin
// selection. Use PoolSettings to cap pool size, set timeouts, and configure SSL.
final Resource<Transactor> poolXa = PostgresPoolTransactor.withEndpoints(
[
pg.Endpoint(
host: 'localhost',
database: 'mydb',
username: 'app',
password: 's3cr3t',
),
],
settings: const pg.PoolSettings(
maxConnectionCount: 10,
sslMode: pg.SslMode.disable,
),
);

From a URL

withUrl accepts a standard PostgreSQL connection URL. This is convenient when the connection string comes from an environment variable:

PostgresPoolTransactor.withUrl
// withUrl accepts a standard PostgreSQL connection URL.
// Format: postgresql://[user:password@]host[:port][/database]
final Resource<Transactor> urlXa = PostgresPoolTransactor.withUrl(
'postgresql://app:s3cr3t@localhost:5432/mydb',
);

Placeholder conversion

? placeholders in SQL strings and Fragment.param calls are automatically rewritten to $1, $2, … before the statement is sent to the driver. This means Query, Update, ParameterizedQuery, and Fragment look exactly the same as they do in the Overview examples:

? placeholders work unchanged
// The postgres wire protocol uses $1, $2, ... positional placeholders.
// ribs_postgres rewrites the ? placeholders used throughout ribs_sql to the
// $N form automatically, so you write the same SQL regardless of the backend.
ConnectionIO<Option<(String, int)>> findUser(String name) =>
(Fragment.raw('SELECT name, age FROM person WHERE name = ') + Fragment.param(name, Put.string))
.query((Read.string, Read.integer).tupled)
.option();

Example program

The following snippet defines an Employee domain model and runs a complete create-insert-query workflow against a pool transactor. All ConnectionIO steps compose with flatMap and execute inside a single BEGIN/COMMIT transaction:

End-to-end example
// ── Domain ───────────────────────────────────────────────────────────────────

final class Employee {
final int? id;
final String name;
final String department;

const Employee({this.id, required this.name, required this.department});
}

final employeeRead = (Read.integer.optional(), Read.string, Read.string).tupled.map(
(t) => Employee(id: t.$1.toNullable(), name: t.$2, department: t.$3),
);

// ── Schema ───────────────────────────────────────────────────────────────────

ConnectionIO<int> createEmployeeTable() =>
'''
CREATE TABLE IF NOT EXISTS employee (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
department TEXT NOT NULL
)'''.update0.run();

// ── Writes ───────────────────────────────────────────────────────────────────

ConnectionIO<int> insertEmployee(String name, String dept) =>
'INSERT INTO employee (name, department) VALUES (?, ?)'
.update((Write.string, Write.string).tupled)
.run((name, dept));

// Insert ... RETURNING reads the generated id back in one round-trip.
ConnectionIO<int> insertEmployeeReturning(String name, String dept) =>
'INSERT INTO employee (name, department) VALUES (?, ?) RETURNING id'
.updateReturning(
(Write.string, Write.string).tupled,
Read.integer,
)
.run((name, dept));

// ── Queries ──────────────────────────────────────────────────────────────────

final byDept = ParameterizedQuery(
'SELECT id, name, department FROM employee WHERE department = ? ORDER BY name',
employeeRead,
Write.string,
);

ConnectionIO<IList<Employee>> employeesIn(String dept) => byDept.ilist(dept);

// ── Program ──────────────────────────────────────────────────────────────────

// Build the transactor from a URL and run the entire program inside a single
// Resource scope. The pool is closed automatically when use() completes.
IO<IList<Employee>> program() => PostgresPoolTransactor.withUrl(
'postgresql://app:s3cr3t@localhost:5432/mydb',
).use((xa) {
final setup = createEmployeeTable()
.flatMap((_) => insertEmployee('Alice', 'Engineering'))
.flatMap((_) => insertEmployee('Bob', 'Engineering'))
.flatMap((_) => insertEmployee('Carol', 'Product'));

return setup.flatMap((_) => employeesIn('Engineering')).transact(xa);
});

program() returns IO<IList<Employee>>. Nothing is executed until the IO is run at the application boundary. The pool is opened and closed by the Resource scope inside withUrl(...).use(...).