Project Doc

Runtime Agent Stack

Patterns for AI features in running apps

Updated: December 2025 Source: RUNTIME-AGENTS.md

Runtime Agent Stack

Status: Draft Purpose: Patterns for AI features in running applications (agents that participate at runtime)


The Problem

When your app calls LLMs at runtime, you need:

  1. API Layer — How to call LLM services (streaming, retry, timeout)
  2. Security — API keys, rate limiting, input validation, prompt injection defense
  3. Validation — Ensuring AI output is safe to render
  4. Observability — Tracing, metrics, cost tracking
  5. Fallbacks — What happens when AI fails

Without these, you get insecure, unreliable, expensive AI features.


Reference Implementations

Project Pattern What It Solves
Ask Dad RAG + rate limiting Knowledge-grounded Q&A with abuse prevention
Commerce Prompt Analyzer Multi-model council Parallel LLM queries with cost guards
SIX Streaming + validation Real-time AI-generated layouts with Zod validation

Pattern 1: AI Service Layer

Source: All projects separate AI logic from handlers

internal/
├── handler/           # HTTP handlers (thin)
│   └── chat.go
├── service/           # Business logic
│   └── chat.go
└── ai/                # AI-specific code
    ├── client.go      # LLM API wrapper
    ├── prompts.go     # System prompts
    └── validation.go  # Output validation

Forge Opinion:

AI code lives in internal/ai/, not scattered in handlers:

// internal/ai/client.go
type Client struct {
    anthropic *anthropic.Client
    model     string
    maxTokens int
}

func (c *Client) Complete(ctx context.Context, prompt string) (string, error) {
    // Timeout, retry, error handling
}

func (c *Client) Stream(ctx context.Context, prompt string) (<-chan string, error) {
    // Streaming with SSE compatibility
}

Pattern 2: Streaming Responses

Source: SIX (six/src/app/api/v4/stream/route.ts)

Stream AI responses via SSE for real-time UX:

// internal/handler/chat.go
func (h *Handler) StreamChat(w http.ResponseWriter, r *http.Request) {
    // Set SSE headers
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming not supported", http.StatusInternalServerError)
        return
    }

    stream, err := h.ai.Stream(r.Context(), prompt)
    if err != nil {
        // Handle error
        return
    }

    for chunk := range stream {
        fmt.Fprintf(w, "data: %s\n\n", chunk)
        flusher.Flush()
    }
}

HTMX Integration:

<div hx-ext="sse"
     sse-connect="/api/chat/stream?q=..."
     sse-swap="message"
     hx-swap="beforeend">
</div>

Pattern 3: Rate Limiting

Source: Ask Dad (in-memory), SIX (Redis)

In-Memory (Simple, single instance):

// internal/middleware/ratelimit.go
type RateLimiter struct {
    mu      sync.RWMutex
    buckets map[string]*bucket
}

type bucket struct {
    tokens    int
    lastReset time.Time
}

func (rl *RateLimiter) Allow(ip string, limit int, window time.Duration) bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    b, exists := rl.buckets[ip]
    if !exists || time.Since(b.lastReset) > window {
        rl.buckets[ip] = &bucket{tokens: limit - 1, lastReset: time.Now()}
        return true
    }

    if b.tokens <= 0 {
        return false
    }

    b.tokens--
    return true
}

Redis (Distributed, multi-instance):

// internal/middleware/ratelimit_redis.go
func (rl *RedisRateLimiter) Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
    // Sliding window using Redis sorted sets
    now := time.Now().UnixMilli()
    windowStart := now - window.Milliseconds()

    pipe := rl.redis.Pipeline()
    pipe.ZRemRangeByScore(ctx, key, "0", strconv.FormatInt(windowStart, 10))
    pipe.ZCard(ctx, key)
    pipe.ZAdd(ctx, key, redis.Z{Score: float64(now), Member: now})
    pipe.Expire(ctx, key, window)

    results, err := pipe.Exec(ctx)
    // Check count against limit
}

Forge Opinion:

Deployment Rate Limiter
Single instance In-memory with cleanup goroutine
Multi-instance Redis sliding window
Fly.io Redis (Upstash) or in-memory per region

Limits per endpoint:

Endpoint Limit Rationale
Chat/completion 10/min Prevent abuse
Streaming 5/min Higher cost
Image generation 2/min Highest cost

Pattern 4: Prompt Injection Defense

Source: Ask Dad (ask-dad/src/lib/security.ts)

Detect and block common prompt injection patterns:

// internal/ai/security.go
var injectionPatterns = []string{
    `(?i)ignore\s+(all\s+)?previous`,
    `(?i)disregard\s+(all\s+)?instructions`,
    `(?i)forget\s+(everything|all)`,
    `(?i)you\s+are\s+now`,
    `(?i)pretend\s+(to\s+be|you('re|\s+are))`,
    `(?i)act\s+as\s+(if|a)`,
    `(?i)jailbreak`,
    `(?i)DAN\s+mode`,
    `(?i)\bsystem\s*:\s*`,
    `(?i)\bassistant\s*:\s*`,
    `(?i)override\s+(safety|instructions)`,
}

func DetectInjection(input string) (bool, string) {
    for _, pattern := range injectionPatterns {
        re := regexp.MustCompile(pattern)
        if re.MatchString(input) {
            return true, pattern
        }
    }
    return false, ""
}

func SanitizeInput(input string) string {
    // Remove null bytes, control characters
    input = strings.Map(func(r rune) rune {
        if r == 0 || unicode.IsControl(r) {
            return -1
        }
        return r
    }, input)

    // Trim and limit length
    input = strings.TrimSpace(input)
    if len(input) > 12000 {
        input = input[:12000]
    }

    return input
}

Forge Opinion:

  1. Always sanitize user input before including in prompts
  2. Log but don't block suspicious patterns (reduces false positives)
  3. Hard block obvious attacks (jailbreak, DAN mode)
  4. Limit length to prevent context stuffing

Pattern 5: Output Validation

Source: SIX (six/src/lib/a2ui/widget-specs.ts)

Validate AI output before rendering:

// internal/ai/validation.go
type LayoutResponse struct {
    Components []Component `json:"components" validate:"required,dive"`
}

type Component struct {
    Type      string         `json:"type" validate:"required,oneof=product-card product-grid hero-banner"`
    ProductID string         `json:"productId,omitempty" validate:"omitempty,uuid"`
    Title     string         `json:"title,omitempty" validate:"omitempty,max=200"`
    Props     map[string]any `json:"props,omitempty"`
}

func ValidateLayout(data []byte) (*LayoutResponse, error) {
    var layout LayoutResponse
    if err := json.Unmarshal(data, &layout); err != nil {
        return nil, fmt.Errorf("invalid JSON: %w", err)
    }

    validate := validator.New()
    if err := validate.Struct(layout); err != nil {
        return nil, fmt.Errorf("validation failed: %w", err)
    }

    // Additional business rules
    for _, c := range layout.Components {
        if !isRegisteredComponent(c.Type) {
            return nil, fmt.Errorf("unknown component type: %s", c.Type)
        }
    }

    return &layout, nil
}

Forge Opinion:

  1. Schema validation for structured output (go-playground/validator)
  2. Component whitelist — reject unknown component types
  3. ID validation — verify referenced IDs exist in database
  4. Size limits — cap arrays, string lengths

Pattern 6: Cost Management

Source: Commerce Prompt Analyzer (commerce-prompt-analyzer/src/services/openRouterService.ts)

Track and limit AI spending:

// internal/ai/cost.go
type CostTracker struct {
    mu           sync.RWMutex
    sessionCosts map[string]float64
}

var modelCosts = map[string]struct{ input, output float64 }{
    "claude-3-5-sonnet": {0.003, 0.015},   // per 1K tokens
    "gpt-4o":            {0.005, 0.015},
    "claude-3-haiku":    {0.00025, 0.00125},
}

func (ct *CostTracker) EstimateCost(model string, inputTokens, outputTokens int) float64 {
    costs, ok := modelCosts[model]
    if !ok {
        return 0
    }
    return (float64(inputTokens)/1000)*costs.input +
           (float64(outputTokens)/1000)*costs.output
}

func (ct *CostTracker) CheckBudget(sessionID string, estimated float64) error {
    ct.mu.RLock()
    current := ct.sessionCosts[sessionID]
    ct.mu.RUnlock()

    if current+estimated > 10.0 { // $10 session limit
        return ErrBudgetExceeded
    }
    return nil
}

func (ct *CostTracker) RecordCost(sessionID string, cost float64) {
    ct.mu.Lock()
    ct.sessionCosts[sessionID] += cost
    ct.mu.Unlock()
}

Forge Opinion:

Limit Type Threshold Action
Per-request $0.10 Reject
Per-session $5.00 Warn
Per-session $10.00 Hard block
Per-minute $0.50 Rate limit

Pattern 7: Fallback Strategies

Source: Ask Dad, SIX (graceful degradation)

// internal/ai/fallback.go
func (c *Client) CompleteWithFallback(ctx context.Context, prompt string) (string, error) {
    // Try primary model
    result, err := c.complete(ctx, c.primaryModel, prompt)
    if err == nil {
        return result, nil
    }

    log.Warn().Err(err).Msg("primary model failed, trying fallback")

    // Try fallback model (cheaper, faster)
    result, err = c.complete(ctx, c.fallbackModel, prompt)
    if err == nil {
        return result, nil
    }

    log.Error().Err(err).Msg("fallback model failed")

    // Return cached/default response
    return c.getCachedResponse(prompt)
}

For UI features:

func (h *Handler) GetRecommendations(w http.ResponseWriter, r *http.Request) {
    recs, err := h.ai.GetRecommendations(r.Context(), userID)
    if err != nil {
        log.Warn().Err(err).Msg("AI recommendations failed")
        // Fall back to non-AI recommendations
        recs = h.service.GetPopularItems(r.Context())
    }

    h.render(w, "recommendations", recs)
}

Forge Opinion:

Failure Fallback
Primary model timeout Try faster/cheaper model
All models fail Return cached response
AI feature fails Degrade to non-AI version
Validation fails Reject and retry once

Pattern 8: Observability

Source: SIX (six/src/app/api/v4/stream/route.ts)

Track AI performance and costs:

// internal/ai/metrics.go
type Metrics struct {
    RequestCount    prometheus.Counter
    RequestDuration prometheus.Histogram
    TokensUsed      prometheus.Counter
    CostTotal       prometheus.Counter
    ErrorCount      prometheus.Counter
}

func (c *Client) Complete(ctx context.Context, prompt string) (string, error) {
    start := time.Now()

    result, err := c.doComplete(ctx, prompt)

    duration := time.Since(start)
    c.metrics.RequestDuration.Observe(duration.Seconds())
    c.metrics.RequestCount.Inc()

    if err != nil {
        c.metrics.ErrorCount.Inc()
        return "", err
    }

    c.metrics.TokensUsed.Add(float64(result.Usage.TotalTokens))
    c.metrics.CostTotal.Add(result.Cost)

    return result.Text, nil
}

Structured Logging:

log.Info().
    Str("model", model).
    Int("input_tokens", usage.InputTokens).
    Int("output_tokens", usage.OutputTokens).
    Float64("cost_usd", cost).
    Dur("latency", duration).
    Str("session_id", sessionID).
    Msg("AI completion")

Forge Opinion:

Log every AI call with:

  • Model used
  • Token counts (input/output)
  • Cost (calculated)
  • Latency
  • Session/user ID (for debugging)
  • Error (if any)

Pattern 9: Multi-Provider Abstraction

Source: Commerce Prompt Analyzer, SIX

Don't lock into one provider:

// internal/ai/provider.go
type Provider interface {
    Complete(ctx context.Context, prompt string, opts Options) (*Response, error)
    Stream(ctx context.Context, prompt string, opts Options) (<-chan Chunk, error)
    Name() string
}

type Options struct {
    MaxTokens   int
    Temperature float64
    Model       string
}

type Response struct {
    Text   string
    Usage  Usage
    Model  string
    Cost   float64
}

// Implementations
type AnthropicProvider struct { ... }
type OpenAIProvider struct { ... }
type GoogleProvider struct { ... }

// Router
type Router struct {
    providers map[string]Provider
    primary   string
    fallback  string
}

func (r *Router) Complete(ctx context.Context, prompt string, opts Options) (*Response, error) {
    provider := r.providers[r.primary]
    resp, err := provider.Complete(ctx, prompt, opts)
    if err != nil && r.fallback != "" {
        provider = r.providers[r.fallback]
        return provider.Complete(ctx, prompt, opts)
    }
    return resp, err
}

Forge Opinion:

  • Abstract providers behind interface
  • Configure primary + fallback in environment
  • Don't hardcode model names in business logic
  • Log which provider was used

API Key Management

Forge Opinion:

// Environment variables (never in code)
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
GOOGLE_AI_KEY=...

// internal/ai/config.go
type Config struct {
    AnthropicKey string `env:"ANTHROPIC_API_KEY,required"`
    OpenAIKey    string `env:"OPENAI_API_KEY"`
    GoogleKey    string `env:"GOOGLE_AI_KEY"`
    PrimaryModel string `env:"AI_PRIMARY_MODEL" envDefault:"claude-3-5-sonnet"`
    FallbackModel string `env:"AI_FALLBACK_MODEL" envDefault:"claude-3-haiku"`
}
  • Keys in environment variables only
  • Use required for primary provider
  • Support multiple providers for redundancy
  • Never log API keys

When to Use Each Pattern

Feature Type Patterns Needed
Simple Q&A Service layer, rate limiting, fallback
Streaming chat + SSE streaming, cost tracking
AI-generated content + Output validation, component whitelist
RAG/knowledge base + Vector search, context injection
Multi-model comparison + Provider abstraction, parallel execution

Anti-Patterns to Avoid

Anti-Pattern Why It's Bad Better Approach
AI calls in handlers Hard to test, no reuse Service layer
No rate limiting Easy to abuse/bankrupt Per-IP + per-session limits
Trust AI output Hallucinations, injection Validate before render
Single provider Outages kill your app Provider abstraction + fallback
No cost tracking Surprise bills Budget limits + alerts
Blocking streams Poor UX SSE streaming

AI Tooling Decisions

The Landscape

The LLM tooling ecosystem includes:

Category Tools Purpose
Orchestration LangChain, LangGraph, CrewAI Chain/agent composition
Go Frameworks Eino, LangChainGo, Genkit, tRPC-Agent-Go Go-native orchestration
Observability LangSmith, Langfuse, Arize Phoenix Tracing, debugging, cost tracking
Direct SDKs anthropic-go, openai-go, google-genai Raw API access

Forge Opinion: Direct SDK First

Default to direct SDK calls. Add frameworks only when complexity justifies overhead.

┌─────────────────────────────────────────────────────────────────┐
│  DECISION FRAMEWORK                                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  Level 0-1: Direct SDK                                           │
│  ├─ github.com/anthropics/anthropic-sdk-go                      │
│  ├─ github.com/openai/openai-go                                 │
│  ├─ Custom internal/ai/ package (patterns in this doc)          │
│  └─ Structured logging for observability                        │
│                                                                   │
│  Level 2: Evaluate Go Framework                                  │
│  ├─ Only if you have complex multi-step workflows               │
│  ├─ Eino (ByteDance) — graph orchestration, production-ready    │
│  ├─ Genkit (Google) — prompt management, deployment tools       │
│  └─ LangChainGo — familiar patterns, less mature                │
│                                                                   │
│  Level 3: Consider Python Sidecar                                │
│  ├─ Only if LangGraph's stateful agents are essential           │
│  ├─ Deploy as separate service, Go proxies to it                │
│  ├─ Adds complexity: two languages, network hop, deployment     │
│  └─ Last resort, not default                                     │
│                                                                   │
└─────────────────────────────────────────────────────────────────┘

Why Not LangChain by Default?

Concern Details
Language mismatch LangChain is Python; Forge is Go
15-25% overhead Abstraction layers add latency even for simple calls
Unused features cost Memory, callbacks, chain management add overhead even when not used
Deployment complexity Python sidecar defeats Go's single-binary advantage
Developer feedback "Where good AI projects go to die" — common criticism

When LangChain/LangGraph IS justified:

  • Stateful multi-agent orchestration (Level 3 apps only)
  • Complex graph-based workflows with cycles and conditionals
  • Team already has Python expertise and infrastructure

Go Framework Comparison

Framework Source Best For Maturity
Eino ByteDance Graph orchestration, streaming, type safety Production
Genkit Google Prompt management, deployment, plugins Production
LangChainGo Community LangChain patterns in Go Beta
tRPC-Agent-Go Tencent Chain/Parallel/Cycle agents Production
Agent SDK Go Community OpenAI Assistants-style Beta

Forge recommendation: If you need a Go framework, start with Eino or Genkit.

// Example: Eino graph orchestration
graph := eino.NewGraph[Input, Output]()
graph.AddNode("extract", extractNode)
graph.AddNode("transform", transformNode)
graph.AddNode("validate", validateNode)
graph.AddEdge("extract", "transform")
graph.AddEdge("transform", "validate")
result, err := graph.Execute(ctx, input)

Observability Stack

Recommendation: Langfuse for production

Tool Type Strengths Weaknesses
Langfuse Open source (MIT) Self-hosted, prompt versioning, 50k free events Requires Clickhouse/Redis setup
Arize Phoenix Open source Easy self-host (single Docker), RAG debugging Less production battle-tested
LangSmith SaaS Zero overhead, great debugging LangChain-centric, not self-hostable
Helicone SaaS Proxy-based, any framework Adds network hop

Forge integration pattern:

// internal/ai/observability.go
type Trace struct {
    TraceID   string
    SpanID    string
    Model     string
    Input     string
    Output    string
    Tokens    Usage
    Cost      float64
    Latency   time.Duration
    Error     error
    Metadata  map[string]any
}

// Send to Langfuse (or structured log for simpler setups)
func (o *Observer) RecordTrace(ctx context.Context, t Trace) error {
    // For production: POST to Langfuse API
    // For development: structured log
    if o.langfuseClient != nil {
        return o.langfuseClient.CreateGeneration(ctx, t)
    }
    log.Info().
        Str("trace_id", t.TraceID).
        Str("model", t.Model).
        Int("input_tokens", t.Tokens.Input).
        Int("output_tokens", t.Tokens.Output).
        Float64("cost_usd", t.Cost).
        Dur("latency", t.Latency).
        Msg("ai_completion")
    return nil
}

Decision matrix:

Scenario Recommendation
Solo dev, simple app Structured logging only
Team, Level 1-2 Langfuse (self-hosted or cloud)
Enterprise, Level 2-3 Langfuse + custom dashboards
Already using LangChain LangSmith (but reconsider LangChain)

SDK Recommendations

Provider Go SDK Notes
Anthropic github.com/anthropics/anthropic-sdk-go Official, streaming support
OpenAI github.com/openai/openai-go Official, full API coverage
Google cloud.google.com/go/vertexai Official, Vertex AI
OpenRouter Direct HTTP No official SDK, use net/http
// internal/ai/providers/anthropic.go
import "github.com/anthropics/anthropic-sdk-go"

type AnthropicProvider struct {
    client *anthropic.Client
    model  string
}

func NewAnthropicProvider(apiKey, model string) *AnthropicProvider {
    return &AnthropicProvider{
        client: anthropic.NewClient(anthropic.WithAPIKey(apiKey)),
        model:  model,
    }
}

func (p *AnthropicProvider) Complete(ctx context.Context, prompt string, opts Options) (*Response, error) {
    msg, err := p.client.Messages.New(ctx, anthropic.MessageNewParams{
        Model:     anthropic.F(p.model),
        MaxTokens: anthropic.F(int64(opts.MaxTokens)),
        Messages: anthropic.F([]anthropic.MessageParam{
            anthropic.NewUserMessage(anthropic.NewTextBlock(prompt)),
        }),
    })
    if err != nil {
        return nil, err
    }

    return &Response{
        Text:  msg.Content[0].Text,
        Usage: Usage{Input: msg.Usage.InputTokens, Output: msg.Usage.OutputTokens},
        Model: p.model,
    }, nil
}

When to Add a Framework

Add a Go framework (Eino/Genkit) when:

  • You have 3+ chained LLM calls with conditional logic
  • You need graph-based orchestration with cycles
  • Multiple team members are building AI features
  • You want built-in retry/timeout/streaming abstractions

Add a Python sidecar (LangGraph) when:

  • Level 3 AG-UI-style real-time collaboration is required
  • Complex stateful agents with human-in-the-loop
  • Existing Python ML/AI team and infrastructure
  • LangGraph-specific features (checkpointing, time-travel) are essential

Stay with direct SDK when:

  • Simple request → response patterns
  • Streaming to HTMX/SSE works fine
  • Single developer or small team
  • You value deployment simplicity

Data Architecture for Multi-Agent Systems

Source: 4 Data Architecture Decisions That Make or Break Agentic Systems (The New Stack, Dec 2025)

When multiple agents operate on the same data, architecture decisions determine success or failure.

The Four Decisions

Decision Summary Forge Stance
1. Unified Data Layer Single source of truth, identity-resolved PostgreSQL via sqlc
2. Real-time over Batch Agents on stale data = dangerous decisions SSE, no batch dependencies
3. Durable Agent Memory Persistent state for resumable workflows 12-Factor Agents Factor 6
4. Co-located Security RLS in database, not application layer Pattern below

Decision 1: Unified Data Layer as "Shared Memory"

"A unified, identity-resolved layer becomes the shared memory. It's what keeps agents grounded and lets them collaborate instead of stepping on each other."

Forge Implementation:

┌─────────────────────────────────────────────────────────────────┐
│  UNIFIED DATA LAYER                                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  PostgreSQL (Single Instance)                                    │
│  ├─ Business data (tournaments, matches, teams)                 │
│  ├─ Agent execution state (agent_runs, steps, checkpoints)      │
│  ├─ Semantic layer (views encoding business logic)              │
│  └─ Vector search (pgvector, when needed)                       │
│                                                                   │
│  NO SEPARATE:                                                    │
│  ✗ AI-specific database                                         │
│  ✗ Vector store as separate service                             │
│  ✗ Agent state in Redis (use Postgres)                          │
│  ✗ Batch-processed analytics tables                             │
│                                                                   │
└─────────────────────────────────────────────────────────────────┘

Entity Resolution Pattern:

// internal/db/entities.go
// All entities use deterministic UUIDs for identity resolution

type Entity struct {
    ID        uuid.UUID `db:"id"`          // Primary identity
    ExternalID string   `db:"external_id"` // Optional external system ID
    EntityType string   `db:"entity_type"` // tournament, team, match, etc.
}

// Agents resolve entities by ID, never by name or fuzzy match
func (q *Queries) ResolveEntity(ctx context.Context, id uuid.UUID) (Entity, error) {
    return q.db.QueryRow(ctx, `
        SELECT id, external_id, entity_type
        FROM entities
        WHERE id = $1
    `, id).Scan(...)
}

Decision 2: Real-time over Batch

Why Forge is Already Aligned:

TRADITIONAL (DANGEROUS):
┌─────────┐    Nightly    ┌─────────┐    Query    ┌─────────┐
│  OLTP   │ ──────────▶  │  OLAP   │ ◀──────── │  Agent  │
└─────────┘   ETL/Batch   └─────────┘            └─────────┘
                              ↑
                    Agent reasons on STALE data

FORGE (SAFE):
┌─────────┐                ┌─────────┐
│Postgres │ ◀──────────▶  │  Agent  │
└─────────┘   Real-time    └─────────┘
     │
     │ SSE
     ▼
┌─────────┐
│ Browser │
└─────────┘
  • SSE broadcasts changes immediately
  • No batch ETL pipelines
  • Agents query live data
  • HTMX swaps reflect current state

Decision 3: Durable Agent Memory

Already covered in 12-FACTOR-AGENTS.md Factor 6:

// Checkpoint after every step
run.Steps[run.CurrentStep].Result = result
run.CurrentStep++
if err := run.Save(ctx, r.db); err != nil {
    return err
}

// Resume = load state + continue
func (r *Runner) Resume(ctx context.Context, runID string) error {
    return r.Run(ctx, runID) // Picks up from CurrentStep
}

Decision 4: Co-located Security (Row-Level Security)

Push authorization into the database layer:

-- migrations/002_agent_rls.sql

-- Enable RLS on agent-accessible tables
ALTER TABLE tournaments ENABLE ROW LEVEL SECURITY;
ALTER TABLE matches ENABLE ROW LEVEL SECURITY;
ALTER TABLE teams ENABLE ROW LEVEL SECURITY;

-- Agent permissions table
CREATE TABLE agent_permissions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    agent_id UUID NOT NULL,
    tournament_id UUID NOT NULL REFERENCES tournaments(id),
    permission_level TEXT NOT NULL CHECK (permission_level IN ('read', 'write', 'admin')),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(agent_id, tournament_id)
);

-- RLS policy: Agents only see authorized tournaments
CREATE POLICY agent_tournament_access ON tournaments
    FOR ALL
    USING (
        -- Human users bypass (authenticated session)
        current_setting('app.user_id', true) IS NOT NULL
        OR
        -- Agents check permission table
        id IN (
            SELECT tournament_id FROM agent_permissions
            WHERE agent_id = current_setting('app.agent_id', true)::uuid
        )
    );

-- Apply same pattern to related tables
CREATE POLICY agent_match_access ON matches
    FOR ALL
    USING (
        current_setting('app.user_id', true) IS NOT NULL
        OR
        tournament_id IN (
            SELECT tournament_id FROM agent_permissions
            WHERE agent_id = current_setting('app.agent_id', true)::uuid
        )
    );

Go Integration:

// internal/db/context.go
func (db *DB) WithAgentContext(ctx context.Context, agentID uuid.UUID) (*sql.Conn, error) {
    conn, err := db.pool.Conn(ctx)
    if err != nil {
        return nil, err
    }

    // Set agent context for RLS
    _, err = conn.ExecContext(ctx,
        "SELECT set_config('app.agent_id', $1, true)",
        agentID.String())
    if err != nil {
        conn.Close()
        return nil, err
    }

    return conn, nil
}

// Usage in agent handler
func (h *Handler) ExecuteAgentAction(ctx context.Context, agentID uuid.UUID, action Action) error {
    conn, err := h.db.WithAgentContext(ctx, agentID)
    if err != nil {
        return err
    }
    defer conn.Close()

    // All queries through this connection respect RLS
    return h.service.Execute(ctx, conn, action)
}

Semantic Layer (When Needed)

Level 0-2: NOT NEEDED

  • Direct SQL queries via sqlc
  • Agents operate on explicit data, not inferred meaning

Level 3+: CONSIDER

-- views/semantic_layer.sql
-- Encode business concepts as views

-- "Active tournament" = business concept
CREATE VIEW active_tournaments AS
SELECT t.*,
       COUNT(DISTINCT tm.id) as team_count,
       COUNT(DISTINCT m.id) FILTER (WHERE m.status = 'completed') as completed_matches,
       COUNT(DISTINCT m.id) as total_matches
FROM tournaments t
LEFT JOIN teams tm ON tm.tournament_id = t.id
LEFT JOIN matches m ON m.tournament_id = t.id
WHERE t.status IN ('registration', 'in_progress')
GROUP BY t.id;

-- "Ready to advance" = business logic encoded
CREATE VIEW matches_ready_to_advance AS
SELECT m.*,
       t_a.name as team_a_name,
       t_b.name as team_b_name
FROM matches m
JOIN teams t_a ON t_a.id = m.team_a_id
JOIN teams t_b ON t_b.id = m.team_b_id
WHERE m.status = 'completed'
  AND m.winner_id IS NOT NULL
  AND NOT EXISTS (
      SELECT 1 FROM matches next_m
      WHERE next_m.round = m.round + 1
        AND (next_m.team_a_id = m.winner_id OR next_m.team_b_id = m.winner_id)
  );

When to add dedicated semantic layer:

  • Multiple agents need shared understanding of business entities
  • Natural language queries against business concepts
  • Cross-domain reasoning (orders ↔ inventory ↔ shipping)

Vector Search (When Needed)

Add pgvector only when semantic retrieval is required:

-- Only add if you need semantic search
CREATE EXTENSION IF NOT EXISTS vector;

ALTER TABLE documents ADD COLUMN embedding vector(1536);

CREATE INDEX documents_embedding_idx ON documents
    USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);
// internal/ai/retrieval.go
func (r *Retriever) SemanticSearch(ctx context.Context, query string, limit int) ([]Document, error) {
    // Generate embedding for query
    embedding, err := r.ai.Embed(ctx, query)
    if err != nil {
        return nil, err
    }

    // Vector similarity search
    return r.db.SearchDocuments(ctx, embedding, limit)
}

Don't add vector search if:

  • Keyword/full-text search is sufficient
  • You're only doing structured queries
  • You don't have embeddings infrastructure

Summary: Data Architecture Checklist

┌─────────────────────────────────────────────────────────────────┐
│  FORGE DATA ARCHITECTURE CHECKLIST                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ✓ Single PostgreSQL instance (unified layer)                   │
│  ✓ UUID-based entity resolution (identity)                      │
│  ✓ SSE for real-time updates (no batch)                         │
│  ✓ Agent state in Postgres (durable memory)                     │
│  ✓ RLS for agent authorization (co-located security)            │
│                                                                   │
│  ADD WHEN NEEDED:                                                │
│  ○ Semantic views (Level 3+ AI integration)                     │
│  ○ pgvector (semantic retrieval)                                │
│  ○ Materialized views (performance optimization)                │
│                                                                   │
│  AVOID:                                                          │
│  ✗ Separate AI/vector database                                  │
│  ✗ Batch ETL pipelines                                          │
│  ✗ Agent state in Redis/memory only                             │
│  ✗ Application-layer authorization for agents                   │
│                                                                   │
└─────────────────────────────────────────────────────────────────┘

References

  • Ask Dad: signal-forge/projects/ask-dad/ (RAG, rate limiting, security)
  • Commerce Prompt Analyzer: commerce-prompt-analyzer/ (multi-model, cost guards)
  • SIX: six/ (streaming, validation, metrics)
  • Eino: github.com/cloudwego/eino (Go graph orchestration)
  • Genkit for Go: go.dev/blog/llmpowered (Google's Go framework)
  • Langfuse: langfuse.com (open source observability)
  • LangChain overhead analysis: fenilsonani.com/articles/langchain-vs-direct-api-performance-analysis