Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
f14e0db
feat(realtime): add pipeline.compaction config + resolution
mudler Jun 22, 2026
ae1adab
refactor(realtime): extract itemID helper, reuse in item.retrieve
mudler Jun 22, 2026
237b701
test(realtime): drop duplicate Ginkgo bootstrap, fold specs into open…
mudler Jun 22, 2026
13f6880
feat(realtime): implement conversation.item.delete
mudler Jun 22, 2026
12bddd9
feat(realtime): implement input_audio_buffer.clear
mudler Jun 22, 2026
dc6e329
feat(realtime): implement conversation.item.truncate (text)
mudler Jun 22, 2026
25decca
feat(realtime): add Conversation.Memory + pair-safe compactionCut
mudler Jun 22, 2026
582b7e6
fix(realtime): compactionCut returns 0 for keep<=0 (no-cap sentinel, …
mudler Jun 22, 2026
2a84d00
style(realtime): gofmt compaction test helper closures
mudler Jun 22, 2026
63bf24f
feat(realtime): inject rolling memory into the prompt + summary builders
mudler Jun 22, 2026
29e1176
feat(realtime): server-side summarize-then-drop compactor
mudler Jun 22, 2026
c967c2f
test(realtime): unit-test prefixMatches eviction-safety predicate
mudler Jun 22, 2026
09909c1
feat(realtime): resolve summarizer model + schedule compaction per turn
mudler Jun 22, 2026
6c567dd
docs(realtime): document conversation compaction + new item events
mudler Jun 22, 2026
487868e
fix(realtime): resolve summary model inside compaction goroutine (laz…
mudler Jun 22, 2026
a5f84be
refactor(realtime): reuse reasoning.ExtractReasoningComplete for summ…
mudler Jun 22, 2026
f6edc05
fix(config): register pipeline.compaction fields in meta registry
mudler Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions core/config/meta/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,36 @@ func DefaultRegistry() map[string]FieldMetaOverride {
Component: "number",
Order: 79,
},
"pipeline.compaction.enabled": {
Section: "pipeline",
Label: "Compaction Enabled",
Description: "Fold conversation items that age out of the live window (Max History Items) into a rolling summary instead of dropping them, so long realtime sessions stay cheap without losing earlier context. Off by default.",
Component: "toggle",
Order: 80,
},
"pipeline.compaction.trigger_items": {
Section: "pipeline",
Label: "Compaction Trigger Items",
Description: "High-water mark: once the live conversation exceeds this many items, the overflow above Max History Items is summarized and evicted. Must be greater than Max History Items; defaults to twice it. The gap controls how often summarization runs.",
Component: "number",
Order: 81,
},
"pipeline.compaction.summary_model": {
Section: "pipeline",
Label: "Compaction Summary Model",
Description: "Optional smaller/cheaper model used to produce the rolling summary. Empty reuses the pipeline's own LLM. On CPU, a tiny model here keeps compaction from competing with the conversation LLM.",
Component: "input",
Advanced: true,
Order: 82,
},
"pipeline.compaction.max_summary_tokens": {
Section: "pipeline",
Label: "Compaction Max Summary Tokens",
Description: "Advisory cap on the rolling summary length (fed to the summarizer prompt). Defaults to 512.",
Component: "number",
Advanced: true,
Order: 83,
},

// --- Functions ---
"function.grammar.parallel_calls": {
Expand Down
21 changes: 21 additions & 0 deletions core/config/model_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,32 @@ type Pipeline struct {
// context fills.
MaxHistoryItems *int `yaml:"max_history_items,omitempty" json:"max_history_items,omitempty"`

// Compaction folds conversation items that age out of the live window
// (max_history_items) into a rolling summary instead of dropping them, so
// long realtime sessions stay cheap without losing earlier context. Nil
// (block absent) means disabled, preserving existing behavior.
Compaction *PipelineCompaction `yaml:"compaction,omitempty" json:"compaction,omitempty"`

// VoiceRecognition gates the pipeline behind speaker verification. Nil
// (block absent) means no gate, preserving existing behavior.
VoiceRecognition *PipelineVoiceRecognition `yaml:"voice_recognition,omitempty" json:"voice_recognition,omitempty"`
}

// PipelineCompaction configures summarize-then-drop for a realtime pipeline.
type PipelineCompaction struct {
// Enabled turns summarize-then-drop on. Default false.
Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"`
// TriggerItems is the high-water mark: once live items exceed it, overflow
// above max_history_items is summarized and evicted. Must exceed
// max_history_items; clamped up if not. Default: 2x max_history_items.
TriggerItems int `yaml:"trigger_items,omitempty" json:"trigger_items,omitempty"`
// SummaryModel optionally names a smaller/cheaper model for the summary
// call. Empty uses the pipeline's own LLM.
SummaryModel string `yaml:"summary_model,omitempty" json:"summary_model,omitempty"`
// MaxSummaryTokens advises the summary length (fed to the prompt). Default 512.
MaxSummaryTokens int `yaml:"max_summary_tokens,omitempty" json:"max_summary_tokens,omitempty"`
}

// ApplyReasoningEffort resolves the effective reasoning effort — a per-request
// value (requestEffort) overrides the config's own ReasoningEffort default —
// stores it on the config so gRPCPredictOpts forwards it to the backend as the
Expand Down
99 changes: 80 additions & 19 deletions core/http/endpoints/openai/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"net/http"
Expand Down Expand Up @@ -134,6 +135,18 @@ type Session struct {
// pairs are kept together so we never feed an orphaned tool result.
MaxHistoryItems int

// Compaction settings resolved from pipeline.compaction (see resolveCompaction).
CompactionEnabled bool
CompactionTrigger int
SummaryModel string
MaxSummaryTokens int

// summarizerFactory lazily builds the model used for compaction summaries
// when summary_model is configured; nil means reuse the pipeline LLM.
summarizerFactory func() (Model, error)
summarizerOnce sync.Once
summarizerCached Model

// AssistantExecutor is non-nil when the session opted into the in-process
// LocalAI Assistant tool surface. Tool calls whose name matches this
// executor's catalog are run inproc and their output is fed back to the
Expand Down Expand Up @@ -241,6 +254,12 @@ type Conversation struct {
ID string
Items []*types.MessageItemUnion
Lock sync.Mutex
// Memory is the rolling summary of items already evicted by compaction. It
// is kept out of Items (so trimRealtimeItems never drops it) and rendered
// as a system message right after the session instructions.
Memory string
// compacting ensures at most one background compaction runs per conversation.
compacting atomic.Bool
}

func (c *Conversation) ToServer() types.Conversation {
Expand Down Expand Up @@ -540,13 +559,12 @@ func runRealtimeSession(application *application.Application, t Transport, model
SoundDetectionWindowMs: cfg.Pipeline.SoundDetectionWindowMs,
SoundDetectionHopMs: cfg.Pipeline.SoundDetectionHopMs,
}
session.CompactionEnabled, session.CompactionTrigger, session.MaxSummaryTokens, session.SummaryModel = resolveCompaction(cfg, session.MaxHistoryItems)

// Create a default conversation
conversationID := generateConversationID()
conversation := &Conversation{
ID: conversationID,
// TODO: We need to truncate the conversation items when a new item is added and we have run out of space. There are multiple places where items
// can be added so we could use a datastructure here that enforces truncation upon addition
ID: conversationID,
Items: []*types.MessageItemUnion{},
}
session.Conversations[conversationID] = conversation
Expand Down Expand Up @@ -577,6 +595,18 @@ func runRealtimeSession(application *application.Application, t Transport, model
}
session.ModelInterface = m

if session.SummaryModel != "" {
summaryModelName := session.SummaryModel
sid := sessionID
session.summarizerFactory = func() (Model, error) {
summaryCfg, lerr := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(summaryModelName, application.ApplicationConfig())
if lerr != nil {
return nil, fmt.Errorf("load summary model config %q: %w", summaryModelName, lerr)
}
return newModel(&summaryCfg.Pipeline, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), evaluator, buildRealtimeRoutingContext(application, sid))
}
}

if cfg.Pipeline.VoiceGateEnabled() {
gate, gerr := newVoiceGate(
*cfg.Pipeline.VoiceRecognition,
Expand Down Expand Up @@ -807,6 +837,15 @@ func runRealtimeSession(application *application.Application, t Transport, model
commitUtterance(respCtx, allAudio, session, conversation, t)
}()

case types.InputAudioBufferClearEvent:
xlog.Debug("recv", "message", string(msg))
// Discard a partially-captured utterance so the client can restart
// input cleanly without the stale buffer leaking into the next commit.
clearInputAudio(session)
sendEvent(t, types.InputAudioBufferClearedEvent{
ServerEventBase: types.ServerEventBase{EventID: e.EventID},
})

case types.ConversationItemCreateEvent:
xlog.Debug("recv", "message", string(msg))
// Add the item to the conversation
Expand Down Expand Up @@ -841,7 +880,39 @@ func runRealtimeSession(application *application.Application, t Transport, model
})

case types.ConversationItemDeleteEvent:
sendError(t, "not_implemented", "Deleting items not implemented", "", "event_TODO")
xlog.Debug("recv", "message", string(msg))
if e.ItemID == "" {
sendError(t, "invalid_item_id", "Need item_id, but none specified", "", "event_TODO")
continue
}
conversation.Lock.Lock()
updated, ok := deleteItem(conversation.Items, e.ItemID)
conversation.Items = updated
conversation.Lock.Unlock()
if !ok {
sendError(t, "invalid_item_id", "Item to delete not found", "", "event_TODO")
continue
}
sendEvent(t, types.ConversationItemDeletedEvent{
ServerEventBase: types.ServerEventBase{EventID: e.EventID},
ItemID: e.ItemID,
})

case types.ConversationItemTruncateEvent:
xlog.Debug("recv", "message", string(msg))
conversation.Lock.Lock()
ok := truncateAssistantText(conversation.Items, e.ItemID, e.ContentIndex)
conversation.Lock.Unlock()
if !ok {
sendError(t, "invalid_item_id", "Item to truncate not found", "", "event_TODO")
continue
}
sendEvent(t, types.ConversationItemTruncatedEvent{
ServerEventBase: types.ServerEventBase{EventID: e.EventID},
ItemID: e.ItemID,
ContentIndex: e.ContentIndex,
AudioEndMs: e.AudioEndMs,
})

case types.ConversationItemRetrieveEvent:
xlog.Debug("recv", "message", string(msg))
Expand All @@ -854,21 +925,7 @@ func runRealtimeSession(application *application.Application, t Transport, model
conversation.Lock.Lock()
var retrievedItem types.MessageItemUnion
for _, item := range conversation.Items {
// We need to check ID in the union
var id string
if item.System != nil {
id = item.System.ID
} else if item.User != nil {
id = item.User.ID
} else if item.Assistant != nil {
id = item.Assistant.ID
} else if item.FunctionCall != nil {
id = item.FunctionCall.ID
} else if item.FunctionCallOutput != nil {
id = item.FunctionCallOutput.ID
}

if id == e.ItemID {
if itemID(item) == e.ItemID {
retrievedItem = *item
break
}
Expand Down Expand Up @@ -1666,6 +1723,9 @@ const maxAssistantToolTurns = 10

func triggerResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams) {
triggerResponseAtTurn(ctx, session, conv, t, overrides, 0)
// Fold aged-out turns into the rolling memory off the critical path; the
// next turn reaps the smaller buffer.
session.maybeCompact(conv)
}

func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams, toolTurn int) {
Expand Down Expand Up @@ -1721,6 +1781,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
var lastUserSpeaker *types.Speaker
personalize := session.voiceGate != nil && session.voiceGate.cfg.PersonalizeEnabled()
conv.Lock.Lock()
conversationHistory = withMemory(conversationHistory, conv.Memory)
items := trimRealtimeItems(conv.Items, session.MaxHistoryItems)
for _, item := range items {
if item.User != nil {
Expand Down
Loading
Loading