Skip to main content

Read Data

Fetch data from your database with caching, rate limiting, and automatic table dependency tracking.

The Code

#[forge::query(cache = "30s")]
pub async fn get_user(ctx: &QueryContext, id: Uuid) -> Result<User> {
sqlx::query_as("SELECT * FROM users WHERE id = $1")
.bind(id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

What Happens

The macro transforms your function into a query handler with:

  • Authentication enforcement (unless marked public)
  • Response caching with TTL-based expiration
  • Rate limiting with configurable keys
  • Statement timeout at the database level
  • Table dependency extraction at compile time

Cache keys are content-addressable: function name + argument hash. Same query with same arguments hits cache. Different arguments get separate entries.

Attributes

AttributeTypeDefaultDescription
publicflagfalseBypass authentication requirement
cache"duration"noneCache TTL (e.g., "30s", "5m", "1h")
timeoutu64noneStatement timeout in seconds
require_role("name")stringnoneRequire specific role (returns 403 if missing)
rate_limit(...)confignoneRate limiting configuration
tables["t1", "t2"]autoExplicit table dependencies (for dynamic SQL)
log"level"noneLog level (trace, debug, info, warn, error, off)

Rate Limit Configuration

rate_limit(requests = 100, per = "1m", key = "user")
ParameterTypeDescription
requestsu32Number of requests allowed
per"duration"Time window ("30s", "1m", "1h")
keystringBucket key: "user", "ip", "tenant", "global"

Patterns

Public Query

No authentication required. Anyone can call this endpoint.

#[forge::query(public)]
pub async fn list_products(ctx: &QueryContext) -> Result<Vec<Product>> {
sqlx::query_as("SELECT * FROM products WHERE active = true")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Admin-Only Query

Requires authenticated user with admin role.

#[forge::query(require_role("admin"))]
pub async fn get_audit_logs(ctx: &QueryContext) -> Result<Vec<AuditLog>> {
sqlx::query_as("SELECT * FROM audit_logs ORDER BY created_at DESC LIMIT 100")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Cached Query with Rate Limiting

Cache results for 5 minutes. Allow 100 requests per minute per user.

#[forge::query(
cache = "5m",
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_dashboard_stats(ctx: &QueryContext) -> Result<DashboardStats> {
sqlx::query_as(
"SELECT count(*) as total, sum(amount) as revenue FROM orders WHERE created_at > now() - interval '30 days'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

User-Scoped Query

Access the authenticated user ID for row-level filtering.

#[forge::query]
pub async fn get_my_orders(ctx: &QueryContext) -> Result<Vec<Order>> {
let user_id = ctx.require_user_id()?;

sqlx::query_as("SELECT * FROM orders WHERE user_id = $1 ORDER BY created_at DESC")
.bind(user_id)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Dynamic SQL with Explicit Tables

When SQL is constructed dynamically, the macro cannot extract table dependencies. Specify them explicitly for real-time subscriptions.

#[forge::query(tables = ["users", "orders"])]
pub async fn search(ctx: &QueryContext, table: String, term: String) -> Result<Vec<Row>> {
let sql = format!("SELECT * FROM {} WHERE name ILIKE $1", table);
sqlx::query_as(&sql)
.bind(format!("%{}%", term))
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}

Query with Timeout

Enforce a 10 second statement timeout. Long-running queries will be cancelled.

#[forge::query(timeout = 10)]
pub async fn expensive_report(ctx: &QueryContext) -> Result<Report> {
sqlx::query_as(
"SELECT * FROM transactions t
JOIN accounts a ON t.account_id = a.id
WHERE t.created_at > now() - interval '1 year'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}

Context Methods

MethodReturn TypeDescription
ctx.db()&PgPoolDatabase connection pool
ctx.authAuthContextAuthentication context (field access)
ctx.requestRequestMetadataRequest metadata (field access)
ctx.require_user_id()Result<Uuid>Get user ID or error if unauthenticated
ctx.require_subject()Result<&str>Get subject claim (for non-UUID auth providers)
ctx.env(key)Option<String>Get environment variable
ctx.env_or(key, default)StringGet env var with default
ctx.env_require(key)Result<String>Get required env var
ctx.env_parse::<T>(key)Result<T>Parse env var to type

AuthContext Methods

Access via ctx.auth:

MethodReturn TypeDescription
is_authenticated()boolCheck if user is authenticated
user_id()Option<Uuid>Get user ID if authenticated
has_role(role)boolCheck if user has role
require_role(role)Result<()>Require role or return 403
claim(key)Option<&Value>Get custom JWT claim
roles()&[String]Get all user roles
subject()Option<&str>Get raw subject claim

RequestMetadata Fields

Access via ctx.request:

FieldTypeDescription
request_idUuidUnique request identifier
trace_idStringDistributed tracing ID
client_ipOption<String>Client IP address
user_agentOption<String>User agent string
timestampDateTime<Utc>Request timestamp

Under the Hood

Content-Addressable Caching

Cache keys combine function name and argument hash:

cache_key = hash(function_name + serialize(args))

get_user(id: "abc") and get_user(id: "xyz") have separate cache entries. Cache pollution from argument variations is impossible.

Table Dependency Extraction

At compile time, the macro parses SQL strings using sqlparser and extracts referenced tables. This powers real-time subscriptions without runtime overhead.

// Extracted: ["users", "orders"]
sqlx::query("SELECT u.*, o.total FROM users u JOIN orders o ON o.user_id = u.id")

For dynamic SQL, use the tables attribute to specify dependencies explicitly.

Rate Limiting

Rate limiting uses the token bucket algorithm. Each bucket (identified by key) starts full and drains with each request. Tokens refill at requests / per rate.

When exceeded, the query returns ForgeError::RateLimitExceeded with:

  • retry_after: seconds until next request allowed
  • limit: configured limit
  • remaining: tokens remaining (0 when exceeded)

Read Replica Routing

When read replicas are configured, queries automatically route to the replica pool. Round-robin distribution via atomic counter balances load across replicas.

Testing

Use TestQueryContext to test queries in isolation.

#[test]
fn test_get_user() {
let ctx = TestQueryContext::authenticated(Uuid::new_v4());

// Test with roles
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.build();

// Test with custom claims
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_claim("org_id", json!("org-123"))
.build();

// Test with environment variables
let ctx = TestQueryContext::builder()
.with_env("API_KEY", "test_key")
.build();
}

With Database

For integration tests, pass a database pool:

#[tokio::test]
async fn test_with_database() {
let pool = setup_test_database().await;
let ctx = TestQueryContext::with_pool(pool, Some(Uuid::new_v4()));

let result = get_user(&ctx, user_id).await;
assert!(result.is_ok());
}

Environment Variable Verification

Verify which environment variables your query accessed:

#[test]
fn test_env_access() {
let ctx = TestQueryContext::builder()
.with_env("EXTERNAL_API_URL", "https://api.example.com")
.build();

// Call your query...

ctx.env_mock().assert_accessed("EXTERNAL_API_URL");
}