Read Data
Fetch data from your database with caching, rate limiting, and automatic table dependency tracking.
The Code
#[forge::query(cache = "30s")]
pub async fn get_user(ctx: &QueryContext, id: Uuid) -> Result<User> {
sqlx::query_as("SELECT * FROM users WHERE id = $1")
.bind(id)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
What Happens
The macro transforms your function into a query handler with:
- Authentication enforcement (unless marked
public) - Response caching with TTL-based expiration
- Rate limiting with configurable keys
- Statement timeout at the database level
- Table dependency extraction at compile time
Cache keys are content-addressable: function name + argument hash. Same query with same arguments hits cache. Different arguments get separate entries.
Attributes
| Attribute | Type | Default | Description |
|---|---|---|---|
public | flag | false | Bypass authentication requirement |
cache | "duration" | none | Cache TTL (e.g., "30s", "5m", "1h") |
timeout | u64 | none | Statement timeout in seconds |
require_role("name") | string | none | Require specific role (returns 403 if missing) |
rate_limit(...) | config | none | Rate limiting configuration |
tables | ["t1", "t2"] | auto | Explicit table dependencies (for dynamic SQL) |
log | "level" | none | Log level (trace, debug, info, warn, error, off) |
Rate Limit Configuration
rate_limit(requests = 100, per = "1m", key = "user")
| Parameter | Type | Description |
|---|---|---|
requests | u32 | Number of requests allowed |
per | "duration" | Time window ("30s", "1m", "1h") |
key | string | Bucket key: "user", "ip", "tenant", "global" |
Patterns
Public Query
No authentication required. Anyone can call this endpoint.
#[forge::query(public)]
pub async fn list_products(ctx: &QueryContext) -> Result<Vec<Product>> {
sqlx::query_as("SELECT * FROM products WHERE active = true")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Admin-Only Query
Requires authenticated user with admin role.
#[forge::query(require_role("admin"))]
pub async fn get_audit_logs(ctx: &QueryContext) -> Result<Vec<AuditLog>> {
sqlx::query_as("SELECT * FROM audit_logs ORDER BY created_at DESC LIMIT 100")
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Cached Query with Rate Limiting
Cache results for 5 minutes. Allow 100 requests per minute per user.
#[forge::query(
cache = "5m",
rate_limit(requests = 100, per = "1m", key = "user")
)]
pub async fn get_dashboard_stats(ctx: &QueryContext) -> Result<DashboardStats> {
sqlx::query_as(
"SELECT count(*) as total, sum(amount) as revenue FROM orders WHERE created_at > now() - interval '30 days'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
User-Scoped Query
Access the authenticated user ID for row-level filtering.
#[forge::query]
pub async fn get_my_orders(ctx: &QueryContext) -> Result<Vec<Order>> {
let user_id = ctx.require_user_id()?;
sqlx::query_as("SELECT * FROM orders WHERE user_id = $1 ORDER BY created_at DESC")
.bind(user_id)
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Dynamic SQL with Explicit Tables
When SQL is constructed dynamically, the macro cannot extract table dependencies. Specify them explicitly for real-time subscriptions.
#[forge::query(tables = ["users", "orders"])]
pub async fn search(ctx: &QueryContext, table: String, term: String) -> Result<Vec<Row>> {
let sql = format!("SELECT * FROM {} WHERE name ILIKE $1", table);
sqlx::query_as(&sql)
.bind(format!("%{}%", term))
.fetch_all(ctx.db())
.await
.map_err(Into::into)
}
Query with Timeout
Enforce a 10 second statement timeout. Long-running queries will be cancelled.
#[forge::query(timeout = 10)]
pub async fn expensive_report(ctx: &QueryContext) -> Result<Report> {
sqlx::query_as(
"SELECT * FROM transactions t
JOIN accounts a ON t.account_id = a.id
WHERE t.created_at > now() - interval '1 year'"
)
.fetch_one(ctx.db())
.await
.map_err(Into::into)
}
Context Methods
| Method | Return Type | Description |
|---|---|---|
ctx.db() | &PgPool | Database connection pool |
ctx.auth | AuthContext | Authentication context (field access) |
ctx.request | RequestMetadata | Request metadata (field access) |
ctx.require_user_id() | Result<Uuid> | Get user ID or error if unauthenticated |
ctx.require_subject() | Result<&str> | Get subject claim (for non-UUID auth providers) |
ctx.env(key) | Option<String> | Get environment variable |
ctx.env_or(key, default) | String | Get env var with default |
ctx.env_require(key) | Result<String> | Get required env var |
ctx.env_parse::<T>(key) | Result<T> | Parse env var to type |
AuthContext Methods
Access via ctx.auth:
| Method | Return Type | Description |
|---|---|---|
is_authenticated() | bool | Check if user is authenticated |
user_id() | Option<Uuid> | Get user ID if authenticated |
has_role(role) | bool | Check if user has role |
require_role(role) | Result<()> | Require role or return 403 |
claim(key) | Option<&Value> | Get custom JWT claim |
roles() | &[String] | Get all user roles |
subject() | Option<&str> | Get raw subject claim |
RequestMetadata Fields
Access via ctx.request:
| Field | Type | Description |
|---|---|---|
request_id | Uuid | Unique request identifier |
trace_id | String | Distributed tracing ID |
client_ip | Option<String> | Client IP address |
user_agent | Option<String> | User agent string |
timestamp | DateTime<Utc> | Request timestamp |
Under the Hood
Content-Addressable Caching
Cache keys combine function name and argument hash:
cache_key = hash(function_name + serialize(args))
get_user(id: "abc") and get_user(id: "xyz") have separate cache entries. Cache pollution from argument variations is impossible.
Table Dependency Extraction
At compile time, the macro parses SQL strings using sqlparser and extracts referenced tables. This powers real-time subscriptions without runtime overhead.
// Extracted: ["users", "orders"]
sqlx::query("SELECT u.*, o.total FROM users u JOIN orders o ON o.user_id = u.id")
For dynamic SQL, use the tables attribute to specify dependencies explicitly.
Rate Limiting
Rate limiting uses the token bucket algorithm. Each bucket (identified by key) starts full and drains with each request. Tokens refill at requests / per rate.
When exceeded, the query returns ForgeError::RateLimitExceeded with:
retry_after: seconds until next request allowedlimit: configured limitremaining: tokens remaining (0 when exceeded)
Read Replica Routing
When read replicas are configured, queries automatically route to the replica pool. Round-robin distribution via atomic counter balances load across replicas.
Testing
Use TestQueryContext to test queries in isolation.
#[test]
fn test_get_user() {
let ctx = TestQueryContext::authenticated(Uuid::new_v4());
// Test with roles
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_role("admin")
.build();
// Test with custom claims
let ctx = TestQueryContext::builder()
.as_user(Uuid::new_v4())
.with_claim("org_id", json!("org-123"))
.build();
// Test with environment variables
let ctx = TestQueryContext::builder()
.with_env("API_KEY", "test_key")
.build();
}
With Database
For integration tests, pass a database pool:
#[tokio::test]
async fn test_with_database() {
let pool = setup_test_database().await;
let ctx = TestQueryContext::with_pool(pool, Some(Uuid::new_v4()));
let result = get_user(&ctx, user_id).await;
assert!(result.is_ok());
}
Environment Variable Verification
Verify which environment variables your query accessed:
#[test]
fn test_env_access() {
let ctx = TestQueryContext::builder()
.with_env("EXTERNAL_API_URL", "https://api.example.com")
.build();
// Call your query...
ctx.env_mock().assert_accessed("EXTERNAL_API_URL");
}