Streaming Results
When queries return many rows, loading everything into memory at once can be problematic. Effect SQL supports streaming results row-by-row using Effect's Stream type.
Why Streaming?
Consider fetching a million rows:
// ❌ Loads everything into memory
const allUsers = yield* sql`SELECT * FROM users` // 1M rows in memory!
// ✅ Stream processes rows incrementally
yield* sql`SELECT * FROM users`.stream.pipe(
Stream.tap(processUser),
Stream.runDrain
)Streaming is useful when:
- Result sets are large (thousands+ rows)
- Processing can happen incrementally
- Memory is constrained
- You're exporting or transforming data
Basic Streaming
Every statement has a .stream property:
import { Stream, Effect, Console } from "effect"
const sql = yield* SqlClient.SqlClient
// Stream all users
const userStream = sql<User>`SELECT * FROM users`.stream
// Process each row
yield* userStream.pipe(
Stream.tap((user) => Console.log(`Processing: ${user.name}`)),
Stream.runDrain
)Collecting Results
To Array
import { Chunk } from "effect"
const users = yield* sql`SELECT * FROM users`.stream.pipe(
Stream.runCollect,
Effect.map(Chunk.toReadonlyArray)
)First N Items
const firstTen = yield* sql`SELECT * FROM users`.stream.pipe(
Stream.take(10),
Stream.runCollect
)Fold/Reduce
const totalAge = yield* sql<{ age: number }>`SELECT age FROM users`.stream.pipe(
Stream.runFold(0, (sum, user) => sum + user.age)
)Transformations
Map
const names = sql`SELECT name FROM users`.stream.pipe(
Stream.map((row) => row.name)
)Filter
const adults = sql<User>`SELECT * FROM users`.stream.pipe(
Stream.filter((user) => user.age >= 18)
)Batch Processing
Process in chunks:
yield* sql`SELECT * FROM users`.stream.pipe(
Stream.grouped(100), // Groups of 100
Stream.mapEffect((batch) => processBatch(batch)),
Stream.runDrain
)Database Cursors
PostgreSQL
PostgreSQL streaming uses cursors:
// Results are fetched in batches from the server
yield* sql`SELECT * FROM users`.stream.pipe(
Stream.runForEach(processUser)
)The cursor fetches rows in chunks (typically 128 at a time), so memory usage stays constant regardless of total result size.
SQLite
SQLite doesn't have true cursors—it loads results into memory. For large datasets, use LIMIT/OFFSET:
const streamLargeSqlite = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
let offset = 0
const limit = 1000
while (true) {
const batch = yield* sql`
SELECT * FROM users
ORDER BY id
LIMIT ${limit} OFFSET ${offset}
`
if (batch.length === 0) break
for (const user of batch) {
yield* processUser(user)
}
offset += limit
}
})Error Handling
Handle errors in streams:
yield* sql`SELECT * FROM users`.stream.pipe(
Stream.mapEffect(processUser),
Stream.catchAll((error) => {
console.error("Stream error:", error)
return Stream.empty
}),
Stream.runDrain
)Or let errors propagate:
const result = yield* sql`SELECT * FROM users`.stream.pipe(
Stream.mapEffect(processUser),
Stream.runDrain
).pipe(
Effect.catchTag("SqlError", handleSqlError)
)Resource Management
Streams are properly scoped—database resources are released when the stream completes:
// Connection is held while streaming
yield* sql`SELECT * FROM large_table`.stream.pipe(
Stream.take(100), // Early termination
Stream.runDrain
)
// Connection released after stream completes (even if early termination)Practical Examples
CSV Export
import { NodeStream } from "@effect/platform-node"
const exportToCsv = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient
const csvStream = sql<User>`SELECT * FROM users`.stream.pipe(
Stream.map((user) => `${user.id},${user.name},${user.email}\n`)
)
yield* NodeStream.pipeTo(
Stream.concat(
Stream.succeed("id,name,email\n"),
csvStream
),
fs.createWriteStream("users.csv")
)
})Batch Updates
yield* sql`SELECT * FROM users WHERE needs_update = true`.stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) =>
sql.withTransaction(
Effect.forEach(batch, (user) =>
sql`UPDATE users SET processed = true WHERE id = ${user.id}`
)
)
),
Stream.runDrain
)Real-time Processing
const processNewOrders = sql`
SELECT * FROM orders
WHERE status = 'pending'
ORDER BY created_at
`.stream.pipe(
Stream.tap((order) =>
Effect.all([
sendNotification(order),
updateInventory(order),
sql`UPDATE orders SET status = 'processing' WHERE id = ${order.id}`
])
),
Stream.runDrain
)Parallel Processing
yield* sql`SELECT * FROM items`.stream.pipe(
Stream.mapEffect(
(item) => processItem(item),
{ concurrency: 10 } // Process 10 items concurrently
),
Stream.runDrain
)Performance Considerations
Memory
Streaming keeps memory usage constant regardless of result size. However, transformations that collect (like grouped) temporarily hold items in memory.
Connections
Streaming holds a database connection for the duration of the stream. For long-running streams, consider:
- Using a dedicated connection pool
- Processing in smaller batches
- Adding timeouts
yield* sql`SELECT * FROM huge_table`.stream.pipe(
Stream.timeout("5 minutes"),
Stream.runDrain
)Backpressure
Effect streams have built-in backpressure—the database won't fetch more rows until the previous batch is processed.
When Not to Stream
Don't use streaming when:
- Result set is small (< 1000 rows typically)
- You need all results in memory anyway
- Operations require multiple passes over data
- Using SQLite (limited streaming support)
// Small result sets - just fetch normally
const users = yield* sql`SELECT * FROM users WHERE role = 'admin'`Next Steps
- Data Loaders - Batch related queries
- Transactions - Stream within transactions
- PostgreSQL - PostgreSQL cursor details