From f14e0db5283108cf79700affd63fa2d89416a896 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:38:27 +0000 Subject: [PATCH 01/17] feat(realtime): add pipeline.compaction config + resolution Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/config/model_config.go | 21 +++++++++ core/http/endpoints/openai/realtime.go | 7 +++ .../endpoints/openai/realtime_compaction.go | 31 +++++++++++++ .../openai/realtime_compaction_test.go | 46 +++++++++++++++++++ 4 files changed, 105 insertions(+) create mode 100644 core/http/endpoints/openai/realtime_compaction.go create mode 100644 core/http/endpoints/openai/realtime_compaction_test.go diff --git a/core/config/model_config.go b/core/config/model_config.go index cbb33683838b..8886ddfd5a5d 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -641,11 +641,32 @@ type Pipeline struct { // context fills. MaxHistoryItems *int `yaml:"max_history_items,omitempty" json:"max_history_items,omitempty"` + // Compaction folds conversation items that age out of the live window + // (max_history_items) into a rolling summary instead of dropping them, so + // long realtime sessions stay cheap without losing earlier context. Nil + // (block absent) means disabled, preserving existing behavior. + Compaction *PipelineCompaction `yaml:"compaction,omitempty" json:"compaction,omitempty"` + // VoiceRecognition gates the pipeline behind speaker verification. Nil // (block absent) means no gate, preserving existing behavior. VoiceRecognition *PipelineVoiceRecognition `yaml:"voice_recognition,omitempty" json:"voice_recognition,omitempty"` } +// PipelineCompaction configures summarize-then-drop for a realtime pipeline. +type PipelineCompaction struct { + // Enabled turns summarize-then-drop on. Default false. + Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"` + // TriggerItems is the high-water mark: once live items exceed it, overflow + // above max_history_items is summarized and evicted. Must exceed + // max_history_items; clamped up if not. Default: 2x max_history_items. + TriggerItems int `yaml:"trigger_items,omitempty" json:"trigger_items,omitempty"` + // SummaryModel optionally names a smaller/cheaper model for the summary + // call. Empty uses the pipeline's own LLM. + SummaryModel string `yaml:"summary_model,omitempty" json:"summary_model,omitempty"` + // MaxSummaryTokens advises the summary length (fed to the prompt). Default 512. + MaxSummaryTokens int `yaml:"max_summary_tokens,omitempty" json:"max_summary_tokens,omitempty"` +} + // ApplyReasoningEffort resolves the effective reasoning effort — a per-request // value (requestEffort) overrides the config's own ReasoningEffort default — // stores it on the config so gRPCPredictOpts forwards it to the backend as the diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 1af4c6b75dee..4eb68772cad0 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -134,6 +134,12 @@ type Session struct { // pairs are kept together so we never feed an orphaned tool result. MaxHistoryItems int + // Compaction settings resolved from pipeline.compaction (see resolveCompaction). + CompactionEnabled bool + CompactionTrigger int + SummaryModel string + MaxSummaryTokens int + // AssistantExecutor is non-nil when the session opted into the in-process // LocalAI Assistant tool surface. Tool calls whose name matches this // executor's catalog are run inproc and their output is fed back to the @@ -540,6 +546,7 @@ func runRealtimeSession(application *application.Application, t Transport, model SoundDetectionWindowMs: cfg.Pipeline.SoundDetectionWindowMs, SoundDetectionHopMs: cfg.Pipeline.SoundDetectionHopMs, } + session.CompactionEnabled, session.CompactionTrigger, session.MaxSummaryTokens, session.SummaryModel = resolveCompaction(cfg, session.MaxHistoryItems) // Create a default conversation conversationID := generateConversationID() diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go new file mode 100644 index 000000000000..d72dfea3bed0 --- /dev/null +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -0,0 +1,31 @@ +package openai + +import ( + "github.com/mudler/LocalAI/core/config" +) + +const ( + defaultMaxSummaryTokens = 512 +) + +// resolveCompaction reads the pipeline.compaction block, applying defaults and +// the trigger>max_history invariant. maxHistory is the already-resolved live +// window size. Returns enabled=false (and zero values) when compaction is off. +func resolveCompaction(cfg *config.ModelConfig, maxHistory int) (enabled bool, trigger, maxSummaryTokens int, summaryModel string) { + if cfg == nil || cfg.Pipeline.Compaction == nil || !cfg.Pipeline.Compaction.Enabled { + return false, 0, 0, "" + } + c := cfg.Pipeline.Compaction + trigger = c.TriggerItems + if trigger <= 0 { + trigger = maxHistory * 2 + } + if trigger <= maxHistory { + trigger = maxHistory + 1 + } + maxSummaryTokens = c.MaxSummaryTokens + if maxSummaryTokens <= 0 { + maxSummaryTokens = defaultMaxSummaryTokens + } + return true, trigger, maxSummaryTokens, c.SummaryModel +} diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go new file mode 100644 index 000000000000..9e79b21c2395 --- /dev/null +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -0,0 +1,46 @@ +package openai + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" +) + +func TestRealtimeCompaction(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Realtime Compaction Suite") +} + +var _ = Describe("resolveCompaction", func() { + It("disables when the block is absent", func() { + enabled, _, _, _ := resolveCompaction(&config.ModelConfig{}, 6) + Expect(enabled).To(BeFalse()) + }) + + It("defaults trigger to 2x max history and tokens to 512", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{Compaction: &config.PipelineCompaction{Enabled: true}}} + enabled, trigger, maxTok, _ := resolveCompaction(cfg, 6) + Expect(enabled).To(BeTrue()) + Expect(trigger).To(Equal(12)) + Expect(maxTok).To(Equal(512)) + }) + + It("clamps trigger to max history + 1 when misconfigured", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{Compaction: &config.PipelineCompaction{Enabled: true, TriggerItems: 4}}} + _, trigger, _, _ := resolveCompaction(cfg, 6) + Expect(trigger).To(Equal(7)) + }) + + It("honors explicit values", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{Compaction: &config.PipelineCompaction{ + Enabled: true, TriggerItems: 20, MaxSummaryTokens: 256, SummaryModel: "tiny"}}} + enabled, trigger, maxTok, model := resolveCompaction(cfg, 6) + Expect(enabled).To(BeTrue()) + Expect(trigger).To(Equal(20)) + Expect(maxTok).To(Equal(256)) + Expect(model).To(Equal("tiny")) + }) +}) From ae1adab9e4c0c9dc3d922c11eee511fd946a3c05 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:42:28 +0000 Subject: [PATCH 02/17] refactor(realtime): extract itemID helper, reuse in item.retrieve Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 16 +------------- .../endpoints/openai/realtime_compaction.go | 21 +++++++++++++++++++ .../openai/realtime_compaction_test.go | 12 +++++++++++ 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 4eb68772cad0..44f2e9628305 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -861,21 +861,7 @@ func runRealtimeSession(application *application.Application, t Transport, model conversation.Lock.Lock() var retrievedItem types.MessageItemUnion for _, item := range conversation.Items { - // We need to check ID in the union - var id string - if item.System != nil { - id = item.System.ID - } else if item.User != nil { - id = item.User.ID - } else if item.Assistant != nil { - id = item.Assistant.ID - } else if item.FunctionCall != nil { - id = item.FunctionCall.ID - } else if item.FunctionCallOutput != nil { - id = item.FunctionCallOutput.ID - } - - if id == e.ItemID { + if itemID(item) == e.ItemID { retrievedItem = *item break } diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index d72dfea3bed0..3a3a4f706bae 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -2,12 +2,33 @@ package openai import ( "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" ) const ( defaultMaxSummaryTokens = 512 ) +// itemID extracts the id from any MessageItemUnion variant ("" if none). +func itemID(item *types.MessageItemUnion) string { + switch { + case item == nil: + return "" + case item.System != nil: + return item.System.ID + case item.User != nil: + return item.User.ID + case item.Assistant != nil: + return item.Assistant.ID + case item.FunctionCall != nil: + return item.FunctionCall.ID + case item.FunctionCallOutput != nil: + return item.FunctionCallOutput.ID + default: + return "" + } +} + // resolveCompaction reads the pipeline.compaction block, applying defaults and // the trigger>max_history invariant. maxHistory is the already-resolved live // window size. Returns enabled=false (and zero values) when compaction is off. diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 9e79b21c2395..e77ed01c2e0a 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -7,6 +7,7 @@ import ( . "github.com/onsi/gomega" "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" ) func TestRealtimeCompaction(t *testing.T) { @@ -44,3 +45,14 @@ var _ = Describe("resolveCompaction", func() { Expect(model).To(Equal("tiny")) }) }) + +var _ = Describe("itemID", func() { + It("returns the id for each variant and empty for nil", func() { + Expect(itemID(nil)).To(Equal("")) + Expect(itemID(&types.MessageItemUnion{User: &types.MessageItemUser{ID: "u1"}})).To(Equal("u1")) + Expect(itemID(&types.MessageItemUnion{Assistant: &types.MessageItemAssistant{ID: "a1"}})).To(Equal("a1")) + Expect(itemID(&types.MessageItemUnion{System: &types.MessageItemSystem{ID: "s1"}})).To(Equal("s1")) + Expect(itemID(&types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: "f1"}})).To(Equal("f1")) + Expect(itemID(&types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: "o1"}})).To(Equal("o1")) + }) +}) From 237b70180c854109f233ded0b4f395ac8a912241 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:44:30 +0000 Subject: [PATCH 03/17] test(realtime): drop duplicate Ginkgo bootstrap, fold specs into openai suite Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime_compaction_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index e77ed01c2e0a..30a28a17b20b 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -1,8 +1,6 @@ package openai import ( - "testing" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -10,11 +8,6 @@ import ( "github.com/mudler/LocalAI/core/http/endpoints/openai/types" ) -func TestRealtimeCompaction(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Realtime Compaction Suite") -} - var _ = Describe("resolveCompaction", func() { It("disables when the block is absent", func() { enabled, _, _, _ := resolveCompaction(&config.ModelConfig{}, 6) From 13f6880ab50ad56db5ec3ba2d80ffe6a1f04c9ad Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:47:36 +0000 Subject: [PATCH 04/17] feat(realtime): implement conversation.item.delete Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 18 ++++++++++++++- .../endpoints/openai/realtime_compaction.go | 11 +++++++++ .../openai/realtime_compaction_test.go | 23 +++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 44f2e9628305..cd4537fd11f6 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -848,7 +848,23 @@ func runRealtimeSession(application *application.Application, t Transport, model }) case types.ConversationItemDeleteEvent: - sendError(t, "not_implemented", "Deleting items not implemented", "", "event_TODO") + xlog.Debug("recv", "message", string(msg)) + if e.ItemID == "" { + sendError(t, "invalid_item_id", "Need item_id, but none specified", "", "event_TODO") + continue + } + conversation.Lock.Lock() + updated, ok := deleteItem(conversation.Items, e.ItemID) + conversation.Items = updated + conversation.Lock.Unlock() + if !ok { + sendError(t, "invalid_item_id", "Item to delete not found", "", "event_TODO") + continue + } + sendEvent(t, types.ConversationItemDeletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: e.EventID}, + ItemID: e.ItemID, + }) case types.ConversationItemRetrieveEvent: xlog.Debug("recv", "message", string(msg)) diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index 3a3a4f706bae..458aa3aac603 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -29,6 +29,17 @@ func itemID(item *types.MessageItemUnion) string { } } +// deleteItem removes the item with id from items, returning the new slice and +// whether it was found. +func deleteItem(items []*types.MessageItemUnion, id string) ([]*types.MessageItemUnion, bool) { + for i, item := range items { + if itemID(item) == id { + return append(items[:i:i], items[i+1:]...), true + } + } + return items, false +} + // resolveCompaction reads the pipeline.compaction block, applying defaults and // the trigger>max_history invariant. maxHistory is the already-resolved live // window size. Returns enabled=false (and zero values) when compaction is off. diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 30a28a17b20b..ccf69983b4a5 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -39,6 +39,29 @@ var _ = Describe("resolveCompaction", func() { }) }) +var _ = Describe("deleteItem", func() { + mk := func(ids ...string) []*types.MessageItemUnion { + out := make([]*types.MessageItemUnion, len(ids)) + for i, id := range ids { + out[i] = &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + return out + } + + It("removes the item with the given id", func() { + items, ok := deleteItem(mk("a", "b", "c"), "b") + Expect(ok).To(BeTrue()) + Expect(len(items)).To(Equal(2)) + Expect(itemID(items[0])).To(Equal("a")) + Expect(itemID(items[1])).To(Equal("c")) + }) + + It("reports not found for an unknown id", func() { + _, ok := deleteItem(mk("a"), "zzz") + Expect(ok).To(BeFalse()) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From 12bddd9e8bb3c2898bb0a1064add86c944c39073 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:51:53 +0000 Subject: [PATCH 05/17] feat(realtime): implement input_audio_buffer.clear Add a handler for the input_audio_buffer.clear client event that discards a partially-captured utterance (raw PCM + buffered Opus frames) via a unit-tested clearInputAudio helper, then acks with input_audio_buffer.cleared. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 9 +++++++++ core/http/endpoints/openai/realtime_compaction.go | 12 ++++++++++++ .../endpoints/openai/realtime_compaction_test.go | 9 +++++++++ 3 files changed, 30 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index cd4537fd11f6..55280027ad25 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -814,6 +814,15 @@ func runRealtimeSession(application *application.Application, t Transport, model commitUtterance(respCtx, allAudio, session, conversation, t) }() + case types.InputAudioBufferClearEvent: + xlog.Debug("recv", "message", string(msg)) + // Discard a partially-captured utterance so the client can restart + // input cleanly without the stale buffer leaking into the next commit. + clearInputAudio(session) + sendEvent(t, types.InputAudioBufferClearedEvent{ + ServerEventBase: types.ServerEventBase{EventID: e.EventID}, + }) + case types.ConversationItemCreateEvent: xlog.Debug("recv", "message", string(msg)) // Add the item to the conversation diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index 458aa3aac603..bfdcc7c7069b 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -9,6 +9,18 @@ const ( defaultMaxSummaryTokens = 512 ) +// clearInputAudio resets the session's pending input audio buffer (the raw +// PCM and any buffered Opus frames). Used by the input_audio_buffer.clear +// realtime event so a client can discard a partially-captured utterance. +func clearInputAudio(s *Session) { + s.AudioBufferLock.Lock() + s.InputAudioBuffer = nil + s.AudioBufferLock.Unlock() + s.OpusFramesLock.Lock() + s.OpusFrames = nil + s.OpusFramesLock.Unlock() +} + // itemID extracts the id from any MessageItemUnion variant ("" if none). func itemID(item *types.MessageItemUnion) string { switch { diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index ccf69983b4a5..5d84f905abfd 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -62,6 +62,15 @@ var _ = Describe("deleteItem", func() { }) }) +var _ = Describe("clearInputAudio", func() { + It("resets the pending PCM and buffered Opus frames", func() { + s := &Session{InputAudioBuffer: []byte{1, 2, 3}, OpusFrames: [][]byte{{9}}} + clearInputAudio(s) + Expect(s.InputAudioBuffer).To(BeNil()) + Expect(s.OpusFrames).To(BeNil()) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From dc6e329b0217228b58a4282ab82cce2009b03643 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:55:33 +0000 Subject: [PATCH 06/17] feat(realtime): implement conversation.item.truncate (text) Clears both .Text and .Transcript of the assistant content part at contentIndex so barge-in truncation also works for audio turns whose spoken words live in .Transcript. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 16 +++++++++++ .../endpoints/openai/realtime_compaction.go | 18 ++++++++++++ .../openai/realtime_compaction_test.go | 28 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 55280027ad25..6ee0621b50a8 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -875,6 +875,22 @@ func runRealtimeSession(application *application.Application, t Transport, model ItemID: e.ItemID, }) + case types.ConversationItemTruncateEvent: + xlog.Debug("recv", "message", string(msg)) + conversation.Lock.Lock() + ok := truncateAssistantText(conversation.Items, e.ItemID, e.ContentIndex) + conversation.Lock.Unlock() + if !ok { + sendError(t, "invalid_item_id", "Item to truncate not found", "", "event_TODO") + continue + } + sendEvent(t, types.ConversationItemTruncatedEvent{ + ServerEventBase: types.ServerEventBase{EventID: e.EventID}, + ItemID: e.ItemID, + ContentIndex: e.ContentIndex, + AudioEndMs: e.AudioEndMs, + }) + case types.ConversationItemRetrieveEvent: xlog.Debug("recv", "message", string(msg)) diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index bfdcc7c7069b..5b41475d7ef9 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -52,6 +52,24 @@ func deleteItem(items []*types.MessageItemUnion, id string) ([]*types.MessageIte return items, false } +// truncateAssistantText clears the text of the assistant item's content part at +// contentIndex. Minimal truncate: used to discard an interrupted/barge-in +// response tail. Both .Text and .Transcript are cleared because realtime audio +// turns store the spoken words in .Transcript (clearing only .Text would no-op). +func truncateAssistantText(items []*types.MessageItemUnion, id string, contentIndex int) bool { + for _, item := range items { + if itemID(item) != id || item.Assistant == nil { + continue + } + if contentIndex >= 0 && contentIndex < len(item.Assistant.Content) { + item.Assistant.Content[contentIndex].Text = "" + item.Assistant.Content[contentIndex].Transcript = "" + } + return true + } + return false +} + // resolveCompaction reads the pipeline.compaction block, applying defaults and // the trigger>max_history invariant. maxHistory is the already-resolved live // window size. Returns enabled=false (and zero values) when compaction is off. diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 5d84f905abfd..694cb470f1e8 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -71,6 +71,34 @@ var _ = Describe("clearInputAudio", func() { }) }) +var _ = Describe("truncateAssistantText", func() { + It("clears the text of the assistant content part at the index", func() { + items := []*types.MessageItemUnion{{Assistant: &types.MessageItemAssistant{ + ID: "a1", + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeText, Text: "hello world"}}, + }}} + ok := truncateAssistantText(items, "a1", 0) + Expect(ok).To(BeTrue()) + Expect(items[0].Assistant.Content[0].Text).To(Equal("")) + }) + + // Realtime assistant *audio* turns store the spoken words in .Transcript, not + // .Text, so a barge-in truncate must clear .Transcript too or it would no-op. + It("clears the transcript of an assistant audio content part", func() { + items := []*types.MessageItemUnion{{Assistant: &types.MessageItemAssistant{ + ID: "a1", + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeAudio, Transcript: "hello world"}}, + }}} + ok := truncateAssistantText(items, "a1", 0) + Expect(ok).To(BeTrue()) + Expect(items[0].Assistant.Content[0].Transcript).To(Equal("")) + }) + + It("returns false for an unknown id", func() { + Expect(truncateAssistantText(nil, "nope", 0)).To(BeFalse()) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From 25decca0dce57f57d02331ea36ad30328d99d92d Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 13:59:12 +0000 Subject: [PATCH 07/17] feat(realtime): add Conversation.Memory + pair-safe compactionCut Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 7 ++++++ .../endpoints/openai/realtime_compaction.go | 20 +++++++++++++++++ .../openai/realtime_compaction_test.go | 22 +++++++++++++++++++ 3 files changed, 49 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 6ee0621b50a8..385e22df86dd 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -12,6 +12,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" "net/http" @@ -247,6 +248,12 @@ type Conversation struct { ID string Items []*types.MessageItemUnion Lock sync.Mutex + // Memory is the rolling summary of items already evicted by compaction. It + // is kept out of Items (so trimRealtimeItems never drops it) and rendered + // as a system message right after the session instructions. + Memory string + // compacting ensures at most one background compaction runs per conversation. + compacting atomic.Bool } func (c *Conversation) ToServer() types.Conversation { diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index 5b41475d7ef9..e99205298611 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -70,6 +70,26 @@ func truncateAssistantText(items []*types.MessageItemUnion, id string, contentIn return false } +// compactionCut returns the index splitting items into overflow (items[:cut], +// to be summarized+evicted) and the kept live tail (items[cut:]), keeping the +// last `keep` items. It mirrors trimRealtimeItems' pair-safety: the cut is +// pulled left so a function_call and its function_call_output are never split +// across the boundary (the whole pair lands in the kept tail). Returns 0 when +// there is nothing to cut. +func compactionCut(items []*types.MessageItemUnion, keep int) int { + if keep < 0 { + keep = 0 + } + cut := len(items) - keep + if cut <= 0 { + return 0 + } + for cut > 0 && items[cut] != nil && items[cut].FunctionCallOutput != nil { + cut-- + } + return cut +} + // resolveCompaction reads the pipeline.compaction block, applying defaults and // the trigger>max_history invariant. maxHistory is the already-resolved live // window size. Returns enabled=false (and zero values) when compaction is off. diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 694cb470f1e8..0bf8ce75ce56 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -99,6 +99,28 @@ var _ = Describe("truncateAssistantText", func() { }) }) +var _ = Describe("compactionCut", func() { + user := func(id string) *types.MessageItemUnion { return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} } + call := func(id string) *types.MessageItemUnion { return &types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: id}} } + out := func(id string) *types.MessageItemUnion { return &types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: id}} } + + It("cuts exactly len-keep when no pairs straddle the boundary", func() { + items := []*types.MessageItemUnion{user("1"), user("2"), user("3"), user("4")} + Expect(compactionCut(items, 2)).To(Equal(2)) + }) + + It("returns 0 when nothing to cut", func() { + Expect(compactionCut([]*types.MessageItemUnion{user("1")}, 2)).To(Equal(0)) + }) + + It("moves the boundary so a call/output pair is not split", func() { + // keep=2 -> naive cut=2, but items[2] is the output of items[1]'s call; + // pull the cut right so the whole pair stays in the kept tail. + items := []*types.MessageItemUnion{user("1"), call("c"), out("c"), user("4")} + Expect(compactionCut(items, 2)).To(Equal(1)) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From 582b7e605ce12e19d03d225b7c108b5435a76db5 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:01:40 +0000 Subject: [PATCH 08/17] fix(realtime): compactionCut returns 0 for keep<=0 (no-cap sentinel, avoids panic) Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime_compaction.go | 7 +++++-- core/http/endpoints/openai/realtime_compaction_test.go | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index e99205298611..f6c7a5a474bd 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -77,8 +77,11 @@ func truncateAssistantText(items []*types.MessageItemUnion, id string, contentIn // across the boundary (the whole pair lands in the kept tail). Returns 0 when // there is nothing to cut. func compactionCut(items []*types.MessageItemUnion, keep int) int { - if keep < 0 { - keep = 0 + // keep <= 0 means no live-window cap (the "unlimited history" sentinel, as + // in trimRealtimeItems): there is nothing to evict, so cut nothing. This + // also avoids indexing items[len(items)] in the pair-safety loop below. + if keep <= 0 { + return 0 } cut := len(items) - keep if cut <= 0 { diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 0bf8ce75ce56..c580b8d0c1ef 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -113,6 +113,11 @@ var _ = Describe("compactionCut", func() { Expect(compactionCut([]*types.MessageItemUnion{user("1")}, 2)).To(Equal(0)) }) + It("returns 0 (cuts nothing) when keep is 0 — the unlimited-window sentinel", func() { + items := []*types.MessageItemUnion{user("1"), user("2"), user("3")} + Expect(compactionCut(items, 0)).To(Equal(0)) + }) + It("moves the boundary so a call/output pair is not split", func() { // keep=2 -> naive cut=2, but items[2] is the output of items[1]'s call; // pull the cut right so the whole pair stays in the kept tail. From 2a84d00812b2701a08606a5ff807224a51df5f6d Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:02:37 +0000 Subject: [PATCH 09/17] style(realtime): gofmt compaction test helper closures Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_compaction_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index c580b8d0c1ef..0975d62e58de 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -100,9 +100,15 @@ var _ = Describe("truncateAssistantText", func() { }) var _ = Describe("compactionCut", func() { - user := func(id string) *types.MessageItemUnion { return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} } - call := func(id string) *types.MessageItemUnion { return &types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: id}} } - out := func(id string) *types.MessageItemUnion { return &types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: id}} } + user := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + call := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: id}} + } + out := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: id}} + } It("cuts exactly len-keep when no pairs straddle the boundary", func() { items := []*types.MessageItemUnion{user("1"), user("2"), user("3"), user("4")} From 63bf24fed2213450e11b1f2cce94f8bbca351638 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:07:08 +0000 Subject: [PATCH 10/17] feat(realtime): inject rolling memory into the prompt + summary builders Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 1 + .../endpoints/openai/realtime_compaction.go | 81 +++++++++++++++++++ .../openai/realtime_compaction_test.go | 49 +++++++++++ 3 files changed, 131 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 385e22df86dd..e6e99cf4a76a 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1762,6 +1762,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa var lastUserSpeaker *types.Speaker personalize := session.voiceGate != nil && session.voiceGate.cfg.PersonalizeEnabled() conv.Lock.Lock() + conversationHistory = withMemory(conversationHistory, conv.Memory) items := trimRealtimeItems(conv.Items, session.MaxHistoryItems) for _, item := range items { if item.User != nil { diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index f6c7a5a474bd..a661d7e6f2cb 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -1,14 +1,95 @@ package openai import ( + "fmt" + "strings" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" ) const ( defaultMaxSummaryTokens = 512 + memoryPrefix = "Summary of earlier conversation:\n" ) +// withMemory inserts the rolling summary as a system message after the existing +// (instructions) history. No-op when memory is empty. +func withMemory(history schema.Messages, memory string) schema.Messages { + if memory == "" { + return history + } + content := memoryPrefix + memory + return append(history, schema.Message{ + Role: string(types.MessageRoleSystem), + StringContent: content, + Content: content, + }) +} + +// renderItemsTranscript renders conversation items as a plain "role: text" +// transcript for summarization. Non-text items (bare tool calls) are labelled +// so the summarizer keeps track of actions taken. +func renderItemsTranscript(items []*types.MessageItemUnion) string { + var b strings.Builder + for _, item := range items { + switch { + case item.User != nil: + b.WriteString("user: ") + for _, c := range item.User.Content { + if c.Text != "" { + b.WriteString(c.Text) + } + if c.Transcript != "" { + b.WriteString(c.Transcript) + } + } + b.WriteString("\n") + case item.Assistant != nil: + b.WriteString("assistant: ") + // Realtime assistant *audio* turns store the spoken words in + // .Transcript (not .Text), so emit both or spoken turns are dropped. + for _, c := range item.Assistant.Content { + if c.Text != "" { + b.WriteString(c.Text) + } + if c.Transcript != "" { + b.WriteString(c.Transcript) + } + } + b.WriteString("\n") + case item.FunctionCall != nil: + b.WriteString(fmt.Sprintf("assistant called tool %s(%s)\n", item.FunctionCall.Name, item.FunctionCall.Arguments)) + case item.FunctionCallOutput != nil: + b.WriteString(fmt.Sprintf("tool result: %s\n", item.FunctionCallOutput.Output)) + } + } + return strings.TrimSpace(b.String()) +} + +// buildSummaryMessages builds the chat messages for the summarizer LLM: a system +// instruction plus prior memory and the new transcript to fold in. maxTokens is +// advisory (fed to the prompt; not hard-enforced in v1). +func buildSummaryMessages(priorMemory, transcript string, maxTokens int) schema.Messages { + system := fmt.Sprintf("You maintain a running memory of a live voice conversation. "+ + "Merge the prior memory with the new exchanges into an updated memory. "+ + "Keep names, decisions, facts, preferences, and open threads. Be concise "+ + "(under ~%d tokens). Output only the updated memory, with no reasoning or tags.", maxTokens) + var user strings.Builder + if priorMemory != "" { + user.WriteString("Prior memory:\n") + user.WriteString(priorMemory) + user.WriteString("\n\n") + } + user.WriteString("New exchanges to fold in:\n") + user.WriteString(transcript) + return schema.Messages{ + {Role: string(types.MessageRoleSystem), StringContent: system, Content: system}, + {Role: string(types.MessageRoleUser), StringContent: user.String(), Content: user.String()}, + } +} + // clearInputAudio resets the session's pending input audio buffer (the raw // PCM and any buffered Opus frames). Used by the input_audio_buffer.clear // realtime event so a client can discard a partially-captured utterance. diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 0975d62e58de..1a7af623cac9 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -6,6 +6,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" ) var _ = Describe("resolveCompaction", func() { @@ -132,6 +133,54 @@ var _ = Describe("compactionCut", func() { }) }) +var _ = Describe("withMemory", func() { + It("inserts a memory system message when memory is non-empty", func() { + base := schema.Messages{{Role: "system", StringContent: "instructions"}} + out := withMemory(base, "user is Bob; wants pizza") + Expect(len(out)).To(Equal(2)) + Expect(out[1].Role).To(Equal("system")) + Expect(out[1].StringContent).To(ContainSubstring("user is Bob")) + Expect(out[1].StringContent).To(ContainSubstring("Summary of earlier conversation")) + }) + + It("is a no-op when memory is empty", func() { + base := schema.Messages{{Role: "system", StringContent: "instructions"}} + Expect(withMemory(base, "")).To(HaveLen(1)) + }) +}) + +var _ = Describe("renderItemsTranscript", func() { + It("renders user and assistant text turns", func() { + items := []*types.MessageItemUnion{ + {User: &types.MessageItemUser{Content: []types.MessageContentInput{{Type: types.MessageContentTypeInputText, Text: "hi"}}}}, + {Assistant: &types.MessageItemAssistant{Content: []types.MessageContentOutput{{Type: types.MessageContentTypeText, Text: "hello"}}}}, + } + out := renderItemsTranscript(items) + Expect(out).To(ContainSubstring("user: hi")) + Expect(out).To(ContainSubstring("assistant: hello")) + }) + + // Realtime assistant *audio* turns store the spoken words in .Transcript, not + // .Text, so the transcript builder must emit .Transcript too or spoken turns + // would be dropped from the summary. + It("renders an assistant audio turn from its transcript", func() { + items := []*types.MessageItemUnion{ + {Assistant: &types.MessageItemAssistant{Content: []types.MessageContentOutput{{Type: types.MessageContentTypeAudio, Transcript: "spoken words"}}}}, + } + Expect(renderItemsTranscript(items)).To(ContainSubstring("assistant: spoken words")) + }) +}) + +var _ = Describe("buildSummaryMessages", func() { + It("includes prior memory and the new transcript", func() { + msgs := buildSummaryMessages("prior facts", "user: hi", 512) + Expect(len(msgs)).To(Equal(2)) + Expect(msgs[0].Role).To(Equal("system")) + Expect(msgs[1].StringContent).To(ContainSubstring("prior facts")) + Expect(msgs[1].StringContent).To(ContainSubstring("user: hi")) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From 29e11764385a8423917f0cca6963d52d6a004697 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:11:32 +0000 Subject: [PATCH 11/17] feat(realtime): server-side summarize-then-drop compactor Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_compaction.go | 106 ++++++++++++++++++ .../openai/realtime_compaction_test.go | 47 ++++++++ 2 files changed, 153 insertions(+) diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index a661d7e6f2cb..7d0cbf85f351 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -1,19 +1,29 @@ package openai import ( + "context" "fmt" + "regexp" "strings" + "time" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/xlog" ) const ( defaultMaxSummaryTokens = 512 memoryPrefix = "Summary of earlier conversation:\n" + // compactionTimeout bounds the summarizer call so a stuck model can't pin the + // compacting flag (and thus block all further compaction) forever. + compactionTimeout = 60 * time.Second ) +// thinkTagRe matches a span (dotall so it spans newlines). +var thinkTagRe = regexp.MustCompile(`(?s).*?`) + // withMemory inserts the rolling summary as a system message after the existing // (instructions) history. No-op when memory is empty. func withMemory(history schema.Messages, memory string) schema.Messages { @@ -195,3 +205,99 @@ func resolveCompaction(cfg *config.ModelConfig, maxHistory int) (enabled bool, t } return true, trigger, maxSummaryTokens, c.SummaryModel } + +// stripThinkTags removes any leaked spans from a summary. +func stripThinkTags(s string) string { + return strings.TrimSpace(thinkTagRe.ReplaceAllString(s, "")) +} + +// prefixMatches reports whether items begins with the same ids, in order, as +// snapshot — i.e. the overflow we summarized is still at the head (no concurrent +// client delete reshuffled it). +func prefixMatches(items, snapshot []*types.MessageItemUnion) bool { + if len(items) < len(snapshot) { + return false + } + for i := range snapshot { + if itemID(items[i]) != itemID(snapshot[i]) { + return false + } + } + return true +} + +// compact folds overflow items into conv.Memory and evicts them. It never holds +// conv.Lock across the summarizer call: snapshot under lock, summarize unlocked, +// commit under lock (re-validating the head is unchanged). On any error it +// leaves the conversation untouched — items are never dropped without a summary. +func (s *Session) compact(conv *Conversation, model Model) { + if model == nil { + return + } + // Snapshot. + conv.Lock.Lock() + if len(conv.Items) <= s.CompactionTrigger { + conv.Lock.Unlock() + return + } + cut := compactionCut(conv.Items, s.MaxHistoryItems) + if cut <= 0 { + conv.Lock.Unlock() + return + } + overflow := append([]*types.MessageItemUnion(nil), conv.Items[:cut]...) + prior := conv.Memory + conv.Lock.Unlock() + + // Summarize (unlocked). + msgs := buildSummaryMessages(prior, renderItemsTranscript(overflow), s.MaxSummaryTokens) + ctx, cancel := context.WithTimeout(context.Background(), compactionTimeout) + defer cancel() + predFunc, err := model.Predict(ctx, msgs, nil, nil, nil, nil, nil, nil, nil, nil, nil) + if err != nil { + xlog.Warn("realtime compaction: summarizer predict failed", "error", err) + return + } + pred, err := predFunc() + if err != nil { + xlog.Warn("realtime compaction: summarizer inference failed", "error", err) + return + } + summary := stripThinkTags(pred.Response) + if summary == "" { + xlog.Warn("realtime compaction: empty summary, skipping eviction") + return + } + + // Commit. + conv.Lock.Lock() + defer conv.Lock.Unlock() + if !prefixMatches(conv.Items, overflow) { + xlog.Debug("realtime compaction: head changed during summary, skipping") + return + } + conv.Memory = summary + conv.Items = conv.Items[len(overflow):] + xlog.Debug("realtime compaction: evicted items into memory", "evicted", len(overflow), "remaining", len(conv.Items)) +} + +// maybeCompact schedules a background compaction when the live buffer has grown +// past the trigger and none is already running. Returns immediately. +func (s *Session) maybeCompact(conv *Conversation, model Model) { + if !s.CompactionEnabled || model == nil { + return + } + conv.Lock.Lock() + over := len(conv.Items) > s.CompactionTrigger + conv.Lock.Unlock() + if !over { + return + } + if !conv.compacting.CompareAndSwap(false, true) { + return + } + go func() { + defer conv.compacting.Store(false) + s.compact(conv, model) + }() +} diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 1a7af623cac9..a81fd414e6a6 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -1,9 +1,12 @@ package openai import ( + "errors" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/core/schema" @@ -181,6 +184,50 @@ var _ = Describe("buildSummaryMessages", func() { }) }) +var _ = Describe("compact", func() { + user := func(id, text string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id, + Content: []types.MessageContentInput{{Type: types.MessageContentTypeInputText, Text: text}}}} + } + + It("summarizes overflow into Memory and evicts it, keeping the live tail", func() { + conv := &Conversation{Items: []*types.MessageItemUnion{ + user("1", "a"), user("2", "b"), user("3", "c"), user("4", "d"), + user("5", "e"), user("6", "f"), user("7", "g"), user("8", "h"), + }} + s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4, MaxSummaryTokens: 512} + m := &fakeModel{predictResp: backend.LLMResponse{Response: "ROLLED UP"}} + + s.compact(conv, m) + + Expect(conv.Memory).To(Equal("ROLLED UP")) + Expect(len(conv.Items)).To(Equal(4)) + Expect(itemID(conv.Items[0])).To(Equal("5")) + // The summarizer saw the evicted turns. + Expect(m.lastMessages[1].StringContent).To(ContainSubstring("a")) + }) + + It("leaves Items and Memory untouched when the summarizer errors", func() { + items := []*types.MessageItemUnion{user("1", "a"), user("2", "b"), user("3", "c")} + conv := &Conversation{Items: items} + s := &Session{CompactionEnabled: true, CompactionTrigger: 2, MaxHistoryItems: 1, MaxSummaryTokens: 512} + m := &fakeModel{predictErr: errors.New("boom")} + + s.compact(conv, m) + + Expect(conv.Memory).To(Equal("")) + Expect(len(conv.Items)).To(Equal(3)) + }) + + It("does nothing when items are at or below the trigger", func() { + conv := &Conversation{Items: []*types.MessageItemUnion{user("1", "a")}} + s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4} + s.compact(conv, &fakeModel{predictResp: backend.LLMResponse{Response: "x"}}) + Expect(conv.Memory).To(Equal("")) + Expect(len(conv.Items)).To(Equal(1)) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From c967c2fe708501cd071ab7d6b06845949cfa21ea Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:15:07 +0000 Subject: [PATCH 12/17] test(realtime): unit-test prefixMatches eviction-safety predicate Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- .../openai/realtime_compaction_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index a81fd414e6a6..45155fcfba6b 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -228,6 +228,34 @@ var _ = Describe("compact", func() { }) }) +var _ = Describe("prefixMatches", func() { + user := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + + It("matches when items begins with the snapshot ids in order", func() { + items := []*types.MessageItemUnion{user("1"), user("2"), user("3")} + snap := []*types.MessageItemUnion{user("1"), user("2")} + Expect(prefixMatches(items, snap)).To(BeTrue()) + }) + + It("matches an empty snapshot", func() { + Expect(prefixMatches([]*types.MessageItemUnion{user("1")}, nil)).To(BeTrue()) + }) + + It("fails when items is shorter than the snapshot (a concurrent delete shrank the head)", func() { + items := []*types.MessageItemUnion{user("1")} + snap := []*types.MessageItemUnion{user("1"), user("2")} + Expect(prefixMatches(items, snap)).To(BeFalse()) + }) + + It("fails when the head ids differ (a concurrent delete reordered the head)", func() { + items := []*types.MessageItemUnion{user("2"), user("3")} + snap := []*types.MessageItemUnion{user("1"), user("2")} + Expect(prefixMatches(items, snap)).To(BeFalse()) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From 09909c10e902d8b0113483f64618dee15070c586 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:20:12 +0000 Subject: [PATCH 13/17] feat(realtime): resolve summarizer model + schedule compaction per turn Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 25 +++++++++++++++--- .../endpoints/openai/realtime_compaction.go | 17 ++++++++++++ .../openai/realtime_compaction_test.go | 26 +++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index e6e99cf4a76a..6f29d281f861 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -141,6 +141,12 @@ type Session struct { SummaryModel string MaxSummaryTokens int + // summarizerFactory lazily builds the model used for compaction summaries + // when summary_model is configured; nil means reuse the pipeline LLM. + summarizerFactory func() (Model, error) + summarizerOnce sync.Once + summarizerCached Model + // AssistantExecutor is non-nil when the session opted into the in-process // LocalAI Assistant tool surface. Tool calls whose name matches this // executor's catalog are run inproc and their output is fed back to the @@ -558,9 +564,7 @@ func runRealtimeSession(application *application.Application, t Transport, model // Create a default conversation conversationID := generateConversationID() conversation := &Conversation{ - ID: conversationID, - // TODO: We need to truncate the conversation items when a new item is added and we have run out of space. There are multiple places where items - // can be added so we could use a datastructure here that enforces truncation upon addition + ID: conversationID, Items: []*types.MessageItemUnion{}, } session.Conversations[conversationID] = conversation @@ -591,6 +595,18 @@ func runRealtimeSession(application *application.Application, t Transport, model } session.ModelInterface = m + if session.SummaryModel != "" { + summaryModelName := session.SummaryModel + sid := sessionID + session.summarizerFactory = func() (Model, error) { + summaryCfg, lerr := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(summaryModelName, application.ApplicationConfig()) + if lerr != nil { + return nil, fmt.Errorf("load summary model config %q: %w", summaryModelName, lerr) + } + return newModel(&summaryCfg.Pipeline, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), evaluator, buildRealtimeRoutingContext(application, sid)) + } + } + if cfg.Pipeline.VoiceGateEnabled() { gate, gerr := newVoiceGate( *cfg.Pipeline.VoiceRecognition, @@ -1707,6 +1723,9 @@ const maxAssistantToolTurns = 10 func triggerResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams) { triggerResponseAtTurn(ctx, session, conv, t, overrides, 0) + // Fold aged-out turns into the rolling memory off the critical path; the + // next turn reaps the smaller buffer. + session.maybeCompact(conv, session.summarizerModel()) } func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams, toolTurn int) { diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index 7d0cbf85f351..f6d9610f7e2f 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -281,6 +281,23 @@ func (s *Session) compact(conv *Conversation, model Model) { xlog.Debug("realtime compaction: evicted items into memory", "evicted", len(overflow), "remaining", len(conv.Items)) } +// summarizerModel resolves the model used to produce compaction summaries. +// Without a configured summary_model (or factory) it reuses the pipeline LLM. +func (s *Session) summarizerModel() Model { + if s.SummaryModel == "" || s.summarizerFactory == nil { + return s.ModelInterface + } + s.summarizerOnce.Do(func() { + m, err := s.summarizerFactory() + if err != nil { + xlog.Warn("realtime compaction: summary_model load failed, falling back to pipeline LLM", "model", s.SummaryModel, "error", err) + m = s.ModelInterface + } + s.summarizerCached = m + }) + return s.summarizerCached +} + // maybeCompact schedules a background compaction when the live buffer has grown // past the trigger and none is already running. Returns immediately. func (s *Session) maybeCompact(conv *Conversation, model Model) { diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index 45155fcfba6b..ea7cd25de5e0 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -256,6 +256,32 @@ var _ = Describe("prefixMatches", func() { }) }) +var _ = Describe("summarizerModel", func() { + It("returns the pipeline model when no summary_model is set", func() { + m := &fakeModel{} + s := &Session{ModelInterface: m} + Expect(s.summarizerModel()).To(Equal(m)) + }) + + It("uses the factory (once) when summary_model is set", func() { + pipeline := &fakeModel{} + small := &fakeModel{} + calls := 0 + s := &Session{ModelInterface: pipeline, SummaryModel: "tiny", + summarizerFactory: func() (Model, error) { calls++; return small, nil }} + Expect(s.summarizerModel()).To(Equal(small)) + Expect(s.summarizerModel()).To(Equal(small)) + Expect(calls).To(Equal(1)) + }) + + It("falls back to the pipeline model when the factory errors", func() { + pipeline := &fakeModel{} + s := &Session{ModelInterface: pipeline, SummaryModel: "tiny", + summarizerFactory: func() (Model, error) { return nil, errors.New("nope") }} + Expect(s.summarizerModel()).To(Equal(pipeline)) + }) +}) + var _ = Describe("itemID", func() { It("returns the id for each variant and empty for nil", func() { Expect(itemID(nil)).To(Equal("")) From 6c567ddb69c0e2957c158a8b00d95f4f7d7f2e43 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:25:54 +0000 Subject: [PATCH 14/17] docs(realtime): document conversation compaction + new item events Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- docs/content/features/openai-realtime.md | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/content/features/openai-realtime.md b/docs/content/features/openai-realtime.md index 48cfc93325f6..a6e99267efcb 100644 --- a/docs/content/features/openai-realtime.md +++ b/docs/content/features/openai-realtime.md @@ -68,6 +68,33 @@ pipeline: This is applied only to the realtime session's copy of the LLM config, so it does not affect other users of the same model. Leave it unset to use the LLM model config's own reasoning settings. +### Conversation compaction (long sessions on CPU) + +By default a realtime session feeds only the last `max_history_items` turns to the LLM; older turns are dropped and forgotten. On CPU, long calls also grow expensive as the prompt fills with verbatim history. Enable `compaction` to instead fold older turns into a rolling summary, so long calls stay cheap without losing earlier context. + +Compaction works with two numbers: + +- **`max_history_items`** is the *live window* — the recent turns kept verbatim in the prompt. +- **`compaction.trigger_items`** is the *high-water mark* — let the buffer grow to here, then summarize the overflow (everything above `max_history_items`) into a rolling memory and evict it. It must be greater than `max_history_items`; if it is not, it is clamped up. + +The gap between the two controls how often summarization runs: a summary call fires roughly every `(trigger_items - max_history_items)` turns (here, about every 6 turns). + +```yaml +pipeline: + max_history_items: 6 # live window — recent turns kept verbatim + compaction: + enabled: true + trigger_items: 12 # summarize overflow back down to max_history_items + summary_model: "" # optional: a small model for the summary (CPU); default = pipeline LLM + max_summary_tokens: 512 +``` + +{{% notice tip %}} +On CPU, set `summary_model` to a small, fast model so compaction never competes with the conversation LLM for compute. Left empty, the pipeline's own LLM produces the summary. +{{% /notice %}} + +Clients can also manage history directly via the now-supported `conversation.item.delete`, `conversation.item.truncate`, and `input_audio_buffer.clear` realtime events. + ## Transports The Realtime API supports two transports: **WebSocket** and **WebRTC**. From 487868ed60b45cc23c3d12aa5db4c971a18983d4 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 14:33:02 +0000 Subject: [PATCH 15/17] fix(realtime): resolve summary model inside compaction goroutine (lazy, off-path) Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 2 +- core/http/endpoints/openai/realtime_compaction.go | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 6f29d281f861..d4d6a0ac40de 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1725,7 +1725,7 @@ func triggerResponse(ctx context.Context, session *Session, conv *Conversation, triggerResponseAtTurn(ctx, session, conv, t, overrides, 0) // Fold aged-out turns into the rolling memory off the critical path; the // next turn reaps the smaller buffer. - session.maybeCompact(conv, session.summarizerModel()) + session.maybeCompact(conv) } func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams, toolTurn int) { diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index f6d9610f7e2f..2cd5a9414f96 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -300,8 +300,8 @@ func (s *Session) summarizerModel() Model { // maybeCompact schedules a background compaction when the live buffer has grown // past the trigger and none is already running. Returns immediately. -func (s *Session) maybeCompact(conv *Conversation, model Model) { - if !s.CompactionEnabled || model == nil { +func (s *Session) maybeCompact(conv *Conversation) { + if !s.CompactionEnabled { return } conv.Lock.Lock() @@ -315,6 +315,13 @@ func (s *Session) maybeCompact(conv *Conversation, model Model) { } go func() { defer conv.compacting.Store(false) + // Resolve (and, for a configured summary_model, lazily load) the + // summarizer only when a compaction actually runs, off the response + // path — so the model load never blocks a user turn. + model := s.summarizerModel() + if model == nil { + return + } s.compact(conv, model) }() } From a5f84be436357575ac75a633ebb91bee9ac457eb Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 15:00:15 +0000 Subject: [PATCH 16/17] refactor(realtime): reuse reasoning.ExtractReasoningComplete for summary stripping Replace the bespoke regex in the compactor with the shared pkg/reasoning extractor (via spokenReasoningConfig), matching the rest of the realtime path and covering all reasoning tag families, not just . Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_compaction.go | 19 +++++++++---------- .../openai/realtime_compaction_test.go | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go index 2cd5a9414f96..f79a2d7a240c 100644 --- a/core/http/endpoints/openai/realtime_compaction.go +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -3,13 +3,13 @@ package openai import ( "context" "fmt" - "regexp" "strings" "time" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/reasoning" "github.com/mudler/xlog" ) @@ -21,9 +21,6 @@ const ( compactionTimeout = 60 * time.Second ) -// thinkTagRe matches a span (dotall so it spans newlines). -var thinkTagRe = regexp.MustCompile(`(?s).*?`) - // withMemory inserts the rolling summary as a system message after the existing // (instructions) history. No-op when memory is empty. func withMemory(history schema.Messages, memory string) schema.Messages { @@ -206,11 +203,6 @@ func resolveCompaction(cfg *config.ModelConfig, maxHistory int) (enabled bool, t return true, trigger, maxSummaryTokens, c.SummaryModel } -// stripThinkTags removes any leaked spans from a summary. -func stripThinkTags(s string) string { - return strings.TrimSpace(thinkTagRe.ReplaceAllString(s, "")) -} - // prefixMatches reports whether items begins with the same ids, in order, as // snapshot — i.e. the overflow we summarized is still at the head (no concurrent // client delete reshuffled it). @@ -263,7 +255,14 @@ func (s *Session) compact(conv *Conversation, model Model) { xlog.Warn("realtime compaction: summarizer inference failed", "error", err) return } - summary := stripThinkTags(pred.Response) + // Strip any leaked reasoning/thinking spans using the same extractor the + // rest of the realtime path uses, rather than a bespoke regex. + rcfg := reasoning.Config{} + if mc := model.PredictConfig(); mc != nil { + rcfg = spokenReasoningConfig(mc.ReasoningConfig) + } + _, summary := reasoning.ExtractReasoningComplete(pred.Response, "", rcfg) + summary = strings.TrimSpace(summary) if summary == "" { xlog.Warn("realtime compaction: empty summary, skipping eviction") return diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go index ea7cd25de5e0..5b19a8259ecc 100644 --- a/core/http/endpoints/openai/realtime_compaction_test.go +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -219,6 +219,20 @@ var _ = Describe("compact", func() { Expect(len(conv.Items)).To(Equal(3)) }) + It("strips leaked reasoning tags from the summary via the shared extractor", func() { + conv := &Conversation{Items: []*types.MessageItemUnion{ + user("1", "a"), user("2", "b"), user("3", "c"), user("4", "d"), + user("5", "e"), user("6", "f"), user("7", "g"), user("8", "h"), + }} + s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4, MaxSummaryTokens: 512} + m := &fakeModel{predictResp: backend.LLMResponse{Response: "planning the summaryCLEAN SUMMARY"}} + + s.compact(conv, m) + + Expect(conv.Memory).To(Equal("CLEAN SUMMARY")) + Expect(conv.Memory).ToNot(ContainSubstring("planning")) + }) + It("does nothing when items are at or below the trigger", func() { conv := &Conversation{Items: []*types.MessageItemUnion{user("1", "a")}} s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4} From f6edc05d827d00febddefe3e5d85e09296ce3014 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 22 Jun 2026 16:35:58 +0000 Subject: [PATCH 17/17] fix(config): register pipeline.compaction fields in meta registry TestAllFieldsHaveRegistryEntries requires every ModelConfig field to have a UI/meta registry entry; add the four pipeline.compaction.* leaves so they render with proper labels/descriptions instead of the reflection fallback. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto --- core/config/meta/registry.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/core/config/meta/registry.go b/core/config/meta/registry.go index a1cfe4c9aabd..3476076e1442 100644 --- a/core/config/meta/registry.go +++ b/core/config/meta/registry.go @@ -537,6 +537,36 @@ func DefaultRegistry() map[string]FieldMetaOverride { Component: "number", Order: 79, }, + "pipeline.compaction.enabled": { + Section: "pipeline", + Label: "Compaction Enabled", + Description: "Fold conversation items that age out of the live window (Max History Items) into a rolling summary instead of dropping them, so long realtime sessions stay cheap without losing earlier context. Off by default.", + Component: "toggle", + Order: 80, + }, + "pipeline.compaction.trigger_items": { + Section: "pipeline", + Label: "Compaction Trigger Items", + Description: "High-water mark: once the live conversation exceeds this many items, the overflow above Max History Items is summarized and evicted. Must be greater than Max History Items; defaults to twice it. The gap controls how often summarization runs.", + Component: "number", + Order: 81, + }, + "pipeline.compaction.summary_model": { + Section: "pipeline", + Label: "Compaction Summary Model", + Description: "Optional smaller/cheaper model used to produce the rolling summary. Empty reuses the pipeline's own LLM. On CPU, a tiny model here keeps compaction from competing with the conversation LLM.", + Component: "input", + Advanced: true, + Order: 82, + }, + "pipeline.compaction.max_summary_tokens": { + Section: "pipeline", + Label: "Compaction Max Summary Tokens", + Description: "Advisory cap on the rolling summary length (fed to the summarizer prompt). Defaults to 512.", + Component: "number", + Advanced: true, + Order: 83, + }, // --- Functions --- "function.grammar.parallel_calls": {