Skip to content

eventbus

View on pkg.go.dev

import "github.com/thesimonho/warden/eventbus"

Package eventbus implements the runtime event processing pipeline. It maintains in-memory container state, broadcasts changes to SSE subscribers, writes audit logs, and monitors container liveness. Event types are defined in the [event] package; filesystem ingestion is handled by watcher/hook.

func StartLivenessChecker(ctx context.Context, store *Store)

StartLivenessChecker runs a goroutine that periodically checks whether containers are still sending heartbeats. When a container misses enough heartbeats (exceeds the staleness threshold), all its worktree states are cleared and frontends are notified.

Stops when ctx is cancelled.

AliveCallbackFunc is called when a container transitions from unknown/stale to alive. Fires on the first lifecycle event (heartbeat or session_start) for a container not in lastEvents. The service layer uses this to reactively start session watchers. Set via Store.SetAliveCallback.

type AliveCallbackFunc func(projectID, agentType, containerName string)

Broker manages SSE client connections and fans out events.

It maintains a set of subscriber channels. Broadcast sends to all subscribers; slow subscribers that can’t keep up have events dropped rather than blocking.

type Broker struct {
// contains filtered or unexported fields
}

func NewBroker() *Broker

NewBroker creates and starts a new SSE broker. The heartbeat goroutine runs until Shutdown is called.

func (b *Broker) Broadcast(evt event.SSEEvent)

Broadcast sends an event to all connected clients. Slow clients have events dropped to avoid blocking.

func (b *Broker) ClientCount() int

ClientCount returns the current number of connected SSE clients.

func (b *Broker) Shutdown()

Shutdown stops the heartbeat goroutine and closes all client channels.

func (b *Broker) Subscribe() (<-chan event.SSEEvent, func())

Subscribe registers a new SSE client and returns its event channel and an unsubscribe function. The caller must call unsubscribe when the client disconnects.

CostUpdateCallbackFunc is called on every cost update event with cumulative cost data parsed from the event payload. sessionID and cost are zero when the event carried no cost data. Set via Store.SetCostUpdateCallback. projectID is the deterministic project identifier (from WARDEN_PROJECT_ID). agentType is the agent type identifier (from WARDEN_AGENT_TYPE). containerName is the Docker container name (from WARDEN_CONTAINER_NAME).

type CostUpdateCallbackFunc func(projectID, agentType, containerName, sessionID string, cost float64, isEstimated bool)

ProjectCost holds accumulated cost for a container.

type ProjectCost struct {
// TotalCost is the aggregate USD cost across all sessions.
TotalCost float64
// MessageCount is the total number of messages.
MessageCount int
// IsEstimated is true when the cost is an estimate (subscription user).
IsEstimated bool
// UpdatedAt is when cost was last reported.
UpdatedAt time.Time
}

ProjectStatePayload is the JSON shape sent over SSE for project_state events. Carries both cost and attention state so the home page can update in real time.

type ProjectStatePayload struct {
event.ProjectRef
TotalCost float64 `json:"totalCost"`
MessageCount int `json:"messageCount"`
NeedsInput bool `json:"needsInput"`
NotificationType event.NotificationType `json:"notificationType,omitempty"`
}

StaleCallbackFunc is called when a container stops sending heartbeats and is marked stale. The service layer uses this to write an audit entry with full project context (project ID and name). Set via Store.SetStaleCallback.

type StaleCallbackFunc func(containerName string)

Store holds in-memory state derived from container events.

Thread-safe for concurrent reads from API handlers and writes from the file watcher goroutine.

type Store struct {
// contains filtered or unexported fields
}

func NewStore(broker *Broker, auditWriter *db.AuditWriter) *Store

NewStore creates an empty event store. If broker is non-nil, state changes are broadcast to SSE clients. The auditWriter handles mode-gated persistence of events to the audit log.

func (s *Store) ActiveContainers() []string

ActiveContainers returns the names of all containers that have sent at least one event (i.e. have an entry in lastEvents).

func (s *Store) AggregateContainerAttention(containerName string) (needsInput bool, highest event.NotificationType)

AggregateContainerAttention returns the highest-priority attention state across all worktrees for a container. The internal variant (lowercase) is used under existing lock; this public variant acquires its own read lock.

func (*Store) BroadcastBudgetContainerStopped

Section titled “func (*Store) BroadcastBudgetContainerStopped”
func (s *Store) BroadcastBudgetContainerStopped(ref event.ProjectRef, containerID string, totalCost, budget float64)

BroadcastBudgetContainerStopped sends a budget_container_stopped SSE event after a container is stopped due to budget enforcement, so frontends can redirect users away from the now-stopped project.

func (s *Store) BroadcastBudgetExceeded(ref event.ProjectRef, totalCost, budget float64)

BroadcastBudgetExceeded sends a budget_exceeded SSE event to all connected frontends so they can show a notification.

func (*Store) BroadcastWorktreeListChanged

Section titled “func (*Store) BroadcastWorktreeListChanged”
func (s *Store) BroadcastWorktreeListChanged(ref event.ProjectRef)

BroadcastWorktreeListChanged sends a worktree_list_changed event to all SSE clients so they can refresh the worktree list for the given container.

func (s *Store) EvictWorktree(containerName, worktreeID string)

EvictWorktree removes all cached state (attention + terminal) for a single worktree. Called after a worktree is removed or cleaned up so the UI stops showing stale entries. Broadcasts a cleared state event so connected frontends update immediately.

func (s *Store) GetContainerWorktreeStates(containerName string) map[string]WorktreeState

GetContainerWorktreeStates returns all worktree attention states for a container. Used by the service layer to aggregate attention across worktrees at the project level.

func (s *Store) GetProjectCost(containerName string) ProjectCost

GetProjectCost returns the cost state for a container. Returns zero value if no cost data exists.

func (s *Store) GetTerminalState(containerName, worktreeID string) TerminalState

GetTerminalState returns the terminal lifecycle state for a worktree. Returns zero value if no terminal state exists.

func (s *Store) GetWorktreeState(containerName, worktreeID string) WorktreeState

GetWorktreeState returns the attention state for a worktree. Returns zero value if no state exists.

func (s *Store) HandleEvent(evt event.ContainerEvent)

HandleEvent processes a container event, updates state, and broadcasts changes to SSE clients. This is the callback passed to the Watcher.

func (s *Store) HasTerminalData(containerName string) bool

HasTerminalData reports whether the store has any terminal lifecycle entries for the given container. O(1) lookup via secondary index.

func (s *Store) LastEventTime(containerName string) time.Time

LastEventTime returns the timestamp of the most recent event from the given container. Returns zero time if no events received.

func (s *Store) MarkContainerStale(containerName string)

MarkContainerStale clears all worktree states for a container that has stopped sending heartbeats, and broadcasts updates to frontends.

func (s *Store) RemoveContainer(containerName string)

RemoveContainer clears all in-memory state for a container without triggering the stale callback. Called when a container is deliberately deleted — prevents the liveness checker from later finding a stale entry for the old container name and inadvertently stopping a newly created container’s session watcher.

func (s *Store) SeedWorktreeBaseline(containerName, worktreeID string)

SeedWorktreeBaseline initializes the attention state for a worktree with UpdatedAt set to the current time. This prevents historical JSONL events (replayed during session watcher catch-up) from setting stale attention state — handleTurnComplete’s timestamp check rejects events older than UpdatedAt. No-op if state already exists for this worktree.

func (s *Store) SetAliveCallback(fn AliveCallbackFunc)

SetAliveCallback registers a function called when a container transitions from unknown/stale to alive. Only fires on lifecycle events (heartbeat or session_start), not on arbitrary events that might arrive for a container before it is fully running.

func (s *Store) SetCostUpdateCallback(fn CostUpdateCallbackFunc)

SetCostUpdateCallback registers a function called on every cost update event. The callback receives cumulative cost data (which may be zero/empty) and is responsible for both persistence and budget enforcement. This is the single integration point between the event bus and the service layer’s cost/budget system.

func (s *Store) SetStaleCallback(fn StaleCallbackFunc)

SetStaleCallback registers a function called when a container’s heartbeat goes stale. The service layer implements this to write an audit entry with full project context.

TerminalState holds push-based terminal lifecycle data for a worktree.

type TerminalState struct {
// SessionAlive is true when the tmux session is running.
SessionAlive bool
// ViewerConnected is true when a browser is connected via WebSocket.
ViewerConnected bool
// ExitCode is Claude's exit code (-1 means not set / still running).
ExitCode int
// UpdatedAt is when this state was last changed.
UpdatedAt time.Time
}

func (ts *TerminalState) DeriveWorktreeState() engine.WorktreeState

DeriveWorktreeState maps terminal process liveness to a WorktreeState. Returns empty string if the terminal state has never been set.

WorktreeState holds the real-time state for a single worktree, derived from container hook events pushed via the event bus.

type WorktreeState struct {
// NeedsInput is true when Claude is waiting for user attention.
NeedsInput bool
// NotificationType indicates why attention is needed.
NotificationType event.NotificationType
// SessionActive is true when a Claude session is running in this worktree.
SessionActive bool
// UpdatedAt is when this state was last changed.
UpdatedAt time.Time
}

WorktreeStatePayload is the JSON shape sent over SSE for worktree_state events. Shared by all broadcast helpers to keep the Go and TypeScript types in sync.

type WorktreeStatePayload struct {
event.ProjectRef
WorktreeID string `json:"worktreeId"`
NeedsInput bool `json:"needsInput"`
NotificationType event.NotificationType `json:"notificationType,omitempty"`
SessionActive bool `json:"sessionActive"`
State engine.WorktreeState `json:"state,omitempty"`
ExitCode int `json:"exitCode,omitempty"`
}

Generated by gomarkdoc