Runtime Agent Stack
Status: Draft Purpose: Patterns for AI features in running applications (agents that participate at runtime)
The Problem
When your app calls LLMs at runtime, you need:
- API Layer — How to call LLM services (streaming, retry, timeout)
- Security — API keys, rate limiting, input validation, prompt injection defense
- Validation — Ensuring AI output is safe to render
- Observability — Tracing, metrics, cost tracking
- Fallbacks — What happens when AI fails
Without these, you get insecure, unreliable, expensive AI features.
Reference Implementations
| Project | Pattern | What It Solves |
|---|---|---|
| Ask Dad | RAG + rate limiting | Knowledge-grounded Q&A with abuse prevention |
| Commerce Prompt Analyzer | Multi-model council | Parallel LLM queries with cost guards |
| SIX | Streaming + validation | Real-time AI-generated layouts with Zod validation |
Pattern 1: AI Service Layer
Source: All projects separate AI logic from handlers
internal/
├── handler/ # HTTP handlers (thin)
│ └── chat.go
├── service/ # Business logic
│ └── chat.go
└── ai/ # AI-specific code
├── client.go # LLM API wrapper
├── prompts.go # System prompts
└── validation.go # Output validation
Forge Opinion:
AI code lives in internal/ai/, not scattered in handlers:
// internal/ai/client.go
type Client struct {
anthropic *anthropic.Client
model string
maxTokens int
}
func (c *Client) Complete(ctx context.Context, prompt string) (string, error) {
// Timeout, retry, error handling
}
func (c *Client) Stream(ctx context.Context, prompt string) (<-chan string, error) {
// Streaming with SSE compatibility
}
Pattern 2: Streaming Responses
Source: SIX (six/src/app/api/v4/stream/route.ts)
Stream AI responses via SSE for real-time UX:
// internal/handler/chat.go
func (h *Handler) StreamChat(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
stream, err := h.ai.Stream(r.Context(), prompt)
if err != nil {
// Handle error
return
}
for chunk := range stream {
fmt.Fprintf(w, "data: %s\n\n", chunk)
flusher.Flush()
}
}
HTMX Integration:
<div hx-ext="sse"
sse-connect="/api/chat/stream?q=..."
sse-swap="message"
hx-swap="beforeend">
</div>
Pattern 3: Rate Limiting
Source: Ask Dad (in-memory), SIX (Redis)
In-Memory (Simple, single instance):
// internal/middleware/ratelimit.go
type RateLimiter struct {
mu sync.RWMutex
buckets map[string]*bucket
}
type bucket struct {
tokens int
lastReset time.Time
}
func (rl *RateLimiter) Allow(ip string, limit int, window time.Duration) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
b, exists := rl.buckets[ip]
if !exists || time.Since(b.lastReset) > window {
rl.buckets[ip] = &bucket{tokens: limit - 1, lastReset: time.Now()}
return true
}
if b.tokens <= 0 {
return false
}
b.tokens--
return true
}
Redis (Distributed, multi-instance):
// internal/middleware/ratelimit_redis.go
func (rl *RedisRateLimiter) Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
// Sliding window using Redis sorted sets
now := time.Now().UnixMilli()
windowStart := now - window.Milliseconds()
pipe := rl.redis.Pipeline()
pipe.ZRemRangeByScore(ctx, key, "0", strconv.FormatInt(windowStart, 10))
pipe.ZCard(ctx, key)
pipe.ZAdd(ctx, key, redis.Z{Score: float64(now), Member: now})
pipe.Expire(ctx, key, window)
results, err := pipe.Exec(ctx)
// Check count against limit
}
Forge Opinion:
| Deployment | Rate Limiter |
|---|---|
| Single instance | In-memory with cleanup goroutine |
| Multi-instance | Redis sliding window |
| Fly.io | Redis (Upstash) or in-memory per region |
Limits per endpoint:
| Endpoint | Limit | Rationale |
|---|---|---|
| Chat/completion | 10/min | Prevent abuse |
| Streaming | 5/min | Higher cost |
| Image generation | 2/min | Highest cost |
Pattern 4: Prompt Injection Defense
Source: Ask Dad (ask-dad/src/lib/security.ts)
Detect and block common prompt injection patterns:
// internal/ai/security.go
var injectionPatterns = []string{
`(?i)ignore\s+(all\s+)?previous`,
`(?i)disregard\s+(all\s+)?instructions`,
`(?i)forget\s+(everything|all)`,
`(?i)you\s+are\s+now`,
`(?i)pretend\s+(to\s+be|you('re|\s+are))`,
`(?i)act\s+as\s+(if|a)`,
`(?i)jailbreak`,
`(?i)DAN\s+mode`,
`(?i)\bsystem\s*:\s*`,
`(?i)\bassistant\s*:\s*`,
`(?i)override\s+(safety|instructions)`,
}
func DetectInjection(input string) (bool, string) {
for _, pattern := range injectionPatterns {
re := regexp.MustCompile(pattern)
if re.MatchString(input) {
return true, pattern
}
}
return false, ""
}
func SanitizeInput(input string) string {
// Remove null bytes, control characters
input = strings.Map(func(r rune) rune {
if r == 0 || unicode.IsControl(r) {
return -1
}
return r
}, input)
// Trim and limit length
input = strings.TrimSpace(input)
if len(input) > 12000 {
input = input[:12000]
}
return input
}
Forge Opinion:
- Always sanitize user input before including in prompts
- Log but don't block suspicious patterns (reduces false positives)
- Hard block obvious attacks (jailbreak, DAN mode)
- Limit length to prevent context stuffing
Pattern 5: Output Validation
Source: SIX (six/src/lib/a2ui/widget-specs.ts)
Validate AI output before rendering:
// internal/ai/validation.go
type LayoutResponse struct {
Components []Component `json:"components" validate:"required,dive"`
}
type Component struct {
Type string `json:"type" validate:"required,oneof=product-card product-grid hero-banner"`
ProductID string `json:"productId,omitempty" validate:"omitempty,uuid"`
Title string `json:"title,omitempty" validate:"omitempty,max=200"`
Props map[string]any `json:"props,omitempty"`
}
func ValidateLayout(data []byte) (*LayoutResponse, error) {
var layout LayoutResponse
if err := json.Unmarshal(data, &layout); err != nil {
return nil, fmt.Errorf("invalid JSON: %w", err)
}
validate := validator.New()
if err := validate.Struct(layout); err != nil {
return nil, fmt.Errorf("validation failed: %w", err)
}
// Additional business rules
for _, c := range layout.Components {
if !isRegisteredComponent(c.Type) {
return nil, fmt.Errorf("unknown component type: %s", c.Type)
}
}
return &layout, nil
}
Forge Opinion:
- Schema validation for structured output (go-playground/validator)
- Component whitelist — reject unknown component types
- ID validation — verify referenced IDs exist in database
- Size limits — cap arrays, string lengths
Pattern 6: Cost Management
Source: Commerce Prompt Analyzer (commerce-prompt-analyzer/src/services/openRouterService.ts)
Track and limit AI spending:
// internal/ai/cost.go
type CostTracker struct {
mu sync.RWMutex
sessionCosts map[string]float64
}
var modelCosts = map[string]struct{ input, output float64 }{
"claude-3-5-sonnet": {0.003, 0.015}, // per 1K tokens
"gpt-4o": {0.005, 0.015},
"claude-3-haiku": {0.00025, 0.00125},
}
func (ct *CostTracker) EstimateCost(model string, inputTokens, outputTokens int) float64 {
costs, ok := modelCosts[model]
if !ok {
return 0
}
return (float64(inputTokens)/1000)*costs.input +
(float64(outputTokens)/1000)*costs.output
}
func (ct *CostTracker) CheckBudget(sessionID string, estimated float64) error {
ct.mu.RLock()
current := ct.sessionCosts[sessionID]
ct.mu.RUnlock()
if current+estimated > 10.0 { // $10 session limit
return ErrBudgetExceeded
}
return nil
}
func (ct *CostTracker) RecordCost(sessionID string, cost float64) {
ct.mu.Lock()
ct.sessionCosts[sessionID] += cost
ct.mu.Unlock()
}
Forge Opinion:
| Limit Type | Threshold | Action |
|---|---|---|
| Per-request | $0.10 | Reject |
| Per-session | $5.00 | Warn |
| Per-session | $10.00 | Hard block |
| Per-minute | $0.50 | Rate limit |
Pattern 7: Fallback Strategies
Source: Ask Dad, SIX (graceful degradation)
// internal/ai/fallback.go
func (c *Client) CompleteWithFallback(ctx context.Context, prompt string) (string, error) {
// Try primary model
result, err := c.complete(ctx, c.primaryModel, prompt)
if err == nil {
return result, nil
}
log.Warn().Err(err).Msg("primary model failed, trying fallback")
// Try fallback model (cheaper, faster)
result, err = c.complete(ctx, c.fallbackModel, prompt)
if err == nil {
return result, nil
}
log.Error().Err(err).Msg("fallback model failed")
// Return cached/default response
return c.getCachedResponse(prompt)
}
For UI features:
func (h *Handler) GetRecommendations(w http.ResponseWriter, r *http.Request) {
recs, err := h.ai.GetRecommendations(r.Context(), userID)
if err != nil {
log.Warn().Err(err).Msg("AI recommendations failed")
// Fall back to non-AI recommendations
recs = h.service.GetPopularItems(r.Context())
}
h.render(w, "recommendations", recs)
}
Forge Opinion:
| Failure | Fallback |
|---|---|
| Primary model timeout | Try faster/cheaper model |
| All models fail | Return cached response |
| AI feature fails | Degrade to non-AI version |
| Validation fails | Reject and retry once |
Pattern 8: Observability
Source: SIX (six/src/app/api/v4/stream/route.ts)
Track AI performance and costs:
// internal/ai/metrics.go
type Metrics struct {
RequestCount prometheus.Counter
RequestDuration prometheus.Histogram
TokensUsed prometheus.Counter
CostTotal prometheus.Counter
ErrorCount prometheus.Counter
}
func (c *Client) Complete(ctx context.Context, prompt string) (string, error) {
start := time.Now()
result, err := c.doComplete(ctx, prompt)
duration := time.Since(start)
c.metrics.RequestDuration.Observe(duration.Seconds())
c.metrics.RequestCount.Inc()
if err != nil {
c.metrics.ErrorCount.Inc()
return "", err
}
c.metrics.TokensUsed.Add(float64(result.Usage.TotalTokens))
c.metrics.CostTotal.Add(result.Cost)
return result.Text, nil
}
Structured Logging:
log.Info().
Str("model", model).
Int("input_tokens", usage.InputTokens).
Int("output_tokens", usage.OutputTokens).
Float64("cost_usd", cost).
Dur("latency", duration).
Str("session_id", sessionID).
Msg("AI completion")
Forge Opinion:
Log every AI call with:
- Model used
- Token counts (input/output)
- Cost (calculated)
- Latency
- Session/user ID (for debugging)
- Error (if any)
Pattern 9: Multi-Provider Abstraction
Source: Commerce Prompt Analyzer, SIX
Don't lock into one provider:
// internal/ai/provider.go
type Provider interface {
Complete(ctx context.Context, prompt string, opts Options) (*Response, error)
Stream(ctx context.Context, prompt string, opts Options) (<-chan Chunk, error)
Name() string
}
type Options struct {
MaxTokens int
Temperature float64
Model string
}
type Response struct {
Text string
Usage Usage
Model string
Cost float64
}
// Implementations
type AnthropicProvider struct { ... }
type OpenAIProvider struct { ... }
type GoogleProvider struct { ... }
// Router
type Router struct {
providers map[string]Provider
primary string
fallback string
}
func (r *Router) Complete(ctx context.Context, prompt string, opts Options) (*Response, error) {
provider := r.providers[r.primary]
resp, err := provider.Complete(ctx, prompt, opts)
if err != nil && r.fallback != "" {
provider = r.providers[r.fallback]
return provider.Complete(ctx, prompt, opts)
}
return resp, err
}
Forge Opinion:
- Abstract providers behind interface
- Configure primary + fallback in environment
- Don't hardcode model names in business logic
- Log which provider was used
API Key Management
Forge Opinion:
// Environment variables (never in code)
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
GOOGLE_AI_KEY=...
// internal/ai/config.go
type Config struct {
AnthropicKey string `env:"ANTHROPIC_API_KEY,required"`
OpenAIKey string `env:"OPENAI_API_KEY"`
GoogleKey string `env:"GOOGLE_AI_KEY"`
PrimaryModel string `env:"AI_PRIMARY_MODEL" envDefault:"claude-3-5-sonnet"`
FallbackModel string `env:"AI_FALLBACK_MODEL" envDefault:"claude-3-haiku"`
}
- Keys in environment variables only
- Use
requiredfor primary provider - Support multiple providers for redundancy
- Never log API keys
When to Use Each Pattern
| Feature Type | Patterns Needed |
|---|---|
| Simple Q&A | Service layer, rate limiting, fallback |
| Streaming chat | + SSE streaming, cost tracking |
| AI-generated content | + Output validation, component whitelist |
| RAG/knowledge base | + Vector search, context injection |
| Multi-model comparison | + Provider abstraction, parallel execution |
Anti-Patterns to Avoid
| Anti-Pattern | Why It's Bad | Better Approach |
|---|---|---|
| AI calls in handlers | Hard to test, no reuse | Service layer |
| No rate limiting | Easy to abuse/bankrupt | Per-IP + per-session limits |
| Trust AI output | Hallucinations, injection | Validate before render |
| Single provider | Outages kill your app | Provider abstraction + fallback |
| No cost tracking | Surprise bills | Budget limits + alerts |
| Blocking streams | Poor UX | SSE streaming |
AI Tooling Decisions
The Landscape
The LLM tooling ecosystem includes:
| Category | Tools | Purpose |
|---|---|---|
| Orchestration | LangChain, LangGraph, CrewAI | Chain/agent composition |
| Go Frameworks | Eino, LangChainGo, Genkit, tRPC-Agent-Go | Go-native orchestration |
| Observability | LangSmith, Langfuse, Arize Phoenix | Tracing, debugging, cost tracking |
| Direct SDKs | anthropic-go, openai-go, google-genai | Raw API access |
Forge Opinion: Direct SDK First
Default to direct SDK calls. Add frameworks only when complexity justifies overhead.
┌─────────────────────────────────────────────────────────────────┐
│ DECISION FRAMEWORK │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Level 0-1: Direct SDK │
│ ├─ github.com/anthropics/anthropic-sdk-go │
│ ├─ github.com/openai/openai-go │
│ ├─ Custom internal/ai/ package (patterns in this doc) │
│ └─ Structured logging for observability │
│ │
│ Level 2: Evaluate Go Framework │
│ ├─ Only if you have complex multi-step workflows │
│ ├─ Eino (ByteDance) — graph orchestration, production-ready │
│ ├─ Genkit (Google) — prompt management, deployment tools │
│ └─ LangChainGo — familiar patterns, less mature │
│ │
│ Level 3: Consider Python Sidecar │
│ ├─ Only if LangGraph's stateful agents are essential │
│ ├─ Deploy as separate service, Go proxies to it │
│ ├─ Adds complexity: two languages, network hop, deployment │
│ └─ Last resort, not default │
│ │
└─────────────────────────────────────────────────────────────────┘
Why Not LangChain by Default?
| Concern | Details |
|---|---|
| Language mismatch | LangChain is Python; Forge is Go |
| 15-25% overhead | Abstraction layers add latency even for simple calls |
| Unused features cost | Memory, callbacks, chain management add overhead even when not used |
| Deployment complexity | Python sidecar defeats Go's single-binary advantage |
| Developer feedback | "Where good AI projects go to die" — common criticism |
When LangChain/LangGraph IS justified:
- Stateful multi-agent orchestration (Level 3 apps only)
- Complex graph-based workflows with cycles and conditionals
- Team already has Python expertise and infrastructure
Go Framework Comparison
| Framework | Source | Best For | Maturity |
|---|---|---|---|
| Eino | ByteDance | Graph orchestration, streaming, type safety | Production |
| Genkit | Prompt management, deployment, plugins | Production | |
| LangChainGo | Community | LangChain patterns in Go | Beta |
| tRPC-Agent-Go | Tencent | Chain/Parallel/Cycle agents | Production |
| Agent SDK Go | Community | OpenAI Assistants-style | Beta |
Forge recommendation: If you need a Go framework, start with Eino or Genkit.
// Example: Eino graph orchestration
graph := eino.NewGraph[Input, Output]()
graph.AddNode("extract", extractNode)
graph.AddNode("transform", transformNode)
graph.AddNode("validate", validateNode)
graph.AddEdge("extract", "transform")
graph.AddEdge("transform", "validate")
result, err := graph.Execute(ctx, input)
Observability Stack
Recommendation: Langfuse for production
| Tool | Type | Strengths | Weaknesses |
|---|---|---|---|
| Langfuse | Open source (MIT) | Self-hosted, prompt versioning, 50k free events | Requires Clickhouse/Redis setup |
| Arize Phoenix | Open source | Easy self-host (single Docker), RAG debugging | Less production battle-tested |
| LangSmith | SaaS | Zero overhead, great debugging | LangChain-centric, not self-hostable |
| Helicone | SaaS | Proxy-based, any framework | Adds network hop |
Forge integration pattern:
// internal/ai/observability.go
type Trace struct {
TraceID string
SpanID string
Model string
Input string
Output string
Tokens Usage
Cost float64
Latency time.Duration
Error error
Metadata map[string]any
}
// Send to Langfuse (or structured log for simpler setups)
func (o *Observer) RecordTrace(ctx context.Context, t Trace) error {
// For production: POST to Langfuse API
// For development: structured log
if o.langfuseClient != nil {
return o.langfuseClient.CreateGeneration(ctx, t)
}
log.Info().
Str("trace_id", t.TraceID).
Str("model", t.Model).
Int("input_tokens", t.Tokens.Input).
Int("output_tokens", t.Tokens.Output).
Float64("cost_usd", t.Cost).
Dur("latency", t.Latency).
Msg("ai_completion")
return nil
}
Decision matrix:
| Scenario | Recommendation |
|---|---|
| Solo dev, simple app | Structured logging only |
| Team, Level 1-2 | Langfuse (self-hosted or cloud) |
| Enterprise, Level 2-3 | Langfuse + custom dashboards |
| Already using LangChain | LangSmith (but reconsider LangChain) |
SDK Recommendations
| Provider | Go SDK | Notes |
|---|---|---|
| Anthropic | github.com/anthropics/anthropic-sdk-go |
Official, streaming support |
| OpenAI | github.com/openai/openai-go |
Official, full API coverage |
cloud.google.com/go/vertexai |
Official, Vertex AI | |
| OpenRouter | Direct HTTP | No official SDK, use net/http |
// internal/ai/providers/anthropic.go
import "github.com/anthropics/anthropic-sdk-go"
type AnthropicProvider struct {
client *anthropic.Client
model string
}
func NewAnthropicProvider(apiKey, model string) *AnthropicProvider {
return &AnthropicProvider{
client: anthropic.NewClient(anthropic.WithAPIKey(apiKey)),
model: model,
}
}
func (p *AnthropicProvider) Complete(ctx context.Context, prompt string, opts Options) (*Response, error) {
msg, err := p.client.Messages.New(ctx, anthropic.MessageNewParams{
Model: anthropic.F(p.model),
MaxTokens: anthropic.F(int64(opts.MaxTokens)),
Messages: anthropic.F([]anthropic.MessageParam{
anthropic.NewUserMessage(anthropic.NewTextBlock(prompt)),
}),
})
if err != nil {
return nil, err
}
return &Response{
Text: msg.Content[0].Text,
Usage: Usage{Input: msg.Usage.InputTokens, Output: msg.Usage.OutputTokens},
Model: p.model,
}, nil
}
When to Add a Framework
Add a Go framework (Eino/Genkit) when:
- You have 3+ chained LLM calls with conditional logic
- You need graph-based orchestration with cycles
- Multiple team members are building AI features
- You want built-in retry/timeout/streaming abstractions
Add a Python sidecar (LangGraph) when:
- Level 3 AG-UI-style real-time collaboration is required
- Complex stateful agents with human-in-the-loop
- Existing Python ML/AI team and infrastructure
- LangGraph-specific features (checkpointing, time-travel) are essential
Stay with direct SDK when:
- Simple request → response patterns
- Streaming to HTMX/SSE works fine
- Single developer or small team
- You value deployment simplicity
Data Architecture for Multi-Agent Systems
Source: 4 Data Architecture Decisions That Make or Break Agentic Systems (The New Stack, Dec 2025)
When multiple agents operate on the same data, architecture decisions determine success or failure.
The Four Decisions
| Decision | Summary | Forge Stance |
|---|---|---|
| 1. Unified Data Layer | Single source of truth, identity-resolved | PostgreSQL via sqlc |
| 2. Real-time over Batch | Agents on stale data = dangerous decisions | SSE, no batch dependencies |
| 3. Durable Agent Memory | Persistent state for resumable workflows | 12-Factor Agents Factor 6 |
| 4. Co-located Security | RLS in database, not application layer | Pattern below |
Decision 1: Unified Data Layer as "Shared Memory"
"A unified, identity-resolved layer becomes the shared memory. It's what keeps agents grounded and lets them collaborate instead of stepping on each other."
Forge Implementation:
┌─────────────────────────────────────────────────────────────────┐
│ UNIFIED DATA LAYER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ PostgreSQL (Single Instance) │
│ ├─ Business data (tournaments, matches, teams) │
│ ├─ Agent execution state (agent_runs, steps, checkpoints) │
│ ├─ Semantic layer (views encoding business logic) │
│ └─ Vector search (pgvector, when needed) │
│ │
│ NO SEPARATE: │
│ ✗ AI-specific database │
│ ✗ Vector store as separate service │
│ ✗ Agent state in Redis (use Postgres) │
│ ✗ Batch-processed analytics tables │
│ │
└─────────────────────────────────────────────────────────────────┘
Entity Resolution Pattern:
// internal/db/entities.go
// All entities use deterministic UUIDs for identity resolution
type Entity struct {
ID uuid.UUID `db:"id"` // Primary identity
ExternalID string `db:"external_id"` // Optional external system ID
EntityType string `db:"entity_type"` // tournament, team, match, etc.
}
// Agents resolve entities by ID, never by name or fuzzy match
func (q *Queries) ResolveEntity(ctx context.Context, id uuid.UUID) (Entity, error) {
return q.db.QueryRow(ctx, `
SELECT id, external_id, entity_type
FROM entities
WHERE id = $1
`, id).Scan(...)
}
Decision 2: Real-time over Batch
Why Forge is Already Aligned:
TRADITIONAL (DANGEROUS):
┌─────────┐ Nightly ┌─────────┐ Query ┌─────────┐
│ OLTP │ ──────────▶ │ OLAP │ ◀──────── │ Agent │
└─────────┘ ETL/Batch └─────────┘ └─────────┘
↑
Agent reasons on STALE data
FORGE (SAFE):
┌─────────┐ ┌─────────┐
│Postgres │ ◀──────────▶ │ Agent │
└─────────┘ Real-time └─────────┘
│
│ SSE
▼
┌─────────┐
│ Browser │
└─────────┘
- SSE broadcasts changes immediately
- No batch ETL pipelines
- Agents query live data
- HTMX swaps reflect current state
Decision 3: Durable Agent Memory
Already covered in 12-FACTOR-AGENTS.md Factor 6:
// Checkpoint after every step
run.Steps[run.CurrentStep].Result = result
run.CurrentStep++
if err := run.Save(ctx, r.db); err != nil {
return err
}
// Resume = load state + continue
func (r *Runner) Resume(ctx context.Context, runID string) error {
return r.Run(ctx, runID) // Picks up from CurrentStep
}
Decision 4: Co-located Security (Row-Level Security)
Push authorization into the database layer:
-- migrations/002_agent_rls.sql
-- Enable RLS on agent-accessible tables
ALTER TABLE tournaments ENABLE ROW LEVEL SECURITY;
ALTER TABLE matches ENABLE ROW LEVEL SECURITY;
ALTER TABLE teams ENABLE ROW LEVEL SECURITY;
-- Agent permissions table
CREATE TABLE agent_permissions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_id UUID NOT NULL,
tournament_id UUID NOT NULL REFERENCES tournaments(id),
permission_level TEXT NOT NULL CHECK (permission_level IN ('read', 'write', 'admin')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(agent_id, tournament_id)
);
-- RLS policy: Agents only see authorized tournaments
CREATE POLICY agent_tournament_access ON tournaments
FOR ALL
USING (
-- Human users bypass (authenticated session)
current_setting('app.user_id', true) IS NOT NULL
OR
-- Agents check permission table
id IN (
SELECT tournament_id FROM agent_permissions
WHERE agent_id = current_setting('app.agent_id', true)::uuid
)
);
-- Apply same pattern to related tables
CREATE POLICY agent_match_access ON matches
FOR ALL
USING (
current_setting('app.user_id', true) IS NOT NULL
OR
tournament_id IN (
SELECT tournament_id FROM agent_permissions
WHERE agent_id = current_setting('app.agent_id', true)::uuid
)
);
Go Integration:
// internal/db/context.go
func (db *DB) WithAgentContext(ctx context.Context, agentID uuid.UUID) (*sql.Conn, error) {
conn, err := db.pool.Conn(ctx)
if err != nil {
return nil, err
}
// Set agent context for RLS
_, err = conn.ExecContext(ctx,
"SELECT set_config('app.agent_id', $1, true)",
agentID.String())
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
// Usage in agent handler
func (h *Handler) ExecuteAgentAction(ctx context.Context, agentID uuid.UUID, action Action) error {
conn, err := h.db.WithAgentContext(ctx, agentID)
if err != nil {
return err
}
defer conn.Close()
// All queries through this connection respect RLS
return h.service.Execute(ctx, conn, action)
}
Semantic Layer (When Needed)
Level 0-2: NOT NEEDED
- Direct SQL queries via sqlc
- Agents operate on explicit data, not inferred meaning
Level 3+: CONSIDER
-- views/semantic_layer.sql
-- Encode business concepts as views
-- "Active tournament" = business concept
CREATE VIEW active_tournaments AS
SELECT t.*,
COUNT(DISTINCT tm.id) as team_count,
COUNT(DISTINCT m.id) FILTER (WHERE m.status = 'completed') as completed_matches,
COUNT(DISTINCT m.id) as total_matches
FROM tournaments t
LEFT JOIN teams tm ON tm.tournament_id = t.id
LEFT JOIN matches m ON m.tournament_id = t.id
WHERE t.status IN ('registration', 'in_progress')
GROUP BY t.id;
-- "Ready to advance" = business logic encoded
CREATE VIEW matches_ready_to_advance AS
SELECT m.*,
t_a.name as team_a_name,
t_b.name as team_b_name
FROM matches m
JOIN teams t_a ON t_a.id = m.team_a_id
JOIN teams t_b ON t_b.id = m.team_b_id
WHERE m.status = 'completed'
AND m.winner_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM matches next_m
WHERE next_m.round = m.round + 1
AND (next_m.team_a_id = m.winner_id OR next_m.team_b_id = m.winner_id)
);
When to add dedicated semantic layer:
- Multiple agents need shared understanding of business entities
- Natural language queries against business concepts
- Cross-domain reasoning (orders ↔ inventory ↔ shipping)
Vector Search (When Needed)
Add pgvector only when semantic retrieval is required:
-- Only add if you need semantic search
CREATE EXTENSION IF NOT EXISTS vector;
ALTER TABLE documents ADD COLUMN embedding vector(1536);
CREATE INDEX documents_embedding_idx ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
// internal/ai/retrieval.go
func (r *Retriever) SemanticSearch(ctx context.Context, query string, limit int) ([]Document, error) {
// Generate embedding for query
embedding, err := r.ai.Embed(ctx, query)
if err != nil {
return nil, err
}
// Vector similarity search
return r.db.SearchDocuments(ctx, embedding, limit)
}
Don't add vector search if:
- Keyword/full-text search is sufficient
- You're only doing structured queries
- You don't have embeddings infrastructure
Summary: Data Architecture Checklist
┌─────────────────────────────────────────────────────────────────┐
│ FORGE DATA ARCHITECTURE CHECKLIST │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ✓ Single PostgreSQL instance (unified layer) │
│ ✓ UUID-based entity resolution (identity) │
│ ✓ SSE for real-time updates (no batch) │
│ ✓ Agent state in Postgres (durable memory) │
│ ✓ RLS for agent authorization (co-located security) │
│ │
│ ADD WHEN NEEDED: │
│ ○ Semantic views (Level 3+ AI integration) │
│ ○ pgvector (semantic retrieval) │
│ ○ Materialized views (performance optimization) │
│ │
│ AVOID: │
│ ✗ Separate AI/vector database │
│ ✗ Batch ETL pipelines │
│ ✗ Agent state in Redis/memory only │
│ ✗ Application-layer authorization for agents │
│ │
└─────────────────────────────────────────────────────────────────┘
References
- Ask Dad:
signal-forge/projects/ask-dad/(RAG, rate limiting, security) - Commerce Prompt Analyzer:
commerce-prompt-analyzer/(multi-model, cost guards) - SIX:
six/(streaming, validation, metrics) - Eino:
github.com/cloudwego/eino(Go graph orchestration) - Genkit for Go:
go.dev/blog/llmpowered(Google's Go framework) - Langfuse:
langfuse.com(open source observability) - LangChain overhead analysis:
fenilsonani.com/articles/langchain-vs-direct-api-performance-analysis