Skip to content

Commit fac8048

Browse files
committed
feat: add socket auditor for forwarding logs to coder agent
Add SocketAuditor that sends audit logs to the Coder workspace agent via a Unix socket. This enables boundary audit events to be forwarded to coderd for centralized logging. Implementation notes: - Batching: 10 logs or 5 seconds, whichever comes first - Wire format: length-prefixed protobuf. proto imported from AgentAPI to simplify boundary -> agent -> coderd forwarding to start.
1 parent 9c9c878 commit fac8048

File tree

6 files changed

+810
-87
lines changed

6 files changed

+810
-87
lines changed

audit/multi_auditor.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package audit
2+
3+
// MultiAuditor wraps multiple auditors and sends audit events to all of them.
4+
type MultiAuditor struct {
5+
auditors []Auditor
6+
}
7+
8+
// NewMultiAuditor creates a new MultiAuditor that sends to all provided auditors.
9+
func NewMultiAuditor(auditors ...Auditor) *MultiAuditor {
10+
return &MultiAuditor{auditors: auditors}
11+
}
12+
13+
// AuditRequest sends the request to all wrapped auditors.
14+
func (m *MultiAuditor) AuditRequest(req Request) {
15+
for _, a := range m.auditors {
16+
a.AuditRequest(req)
17+
}
18+
}

audit/socket_auditor.go

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package audit
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"fmt"
7+
"log/slog"
8+
"net"
9+
"time"
10+
11+
"google.golang.org/protobuf/proto"
12+
"google.golang.org/protobuf/types/known/timestamppb"
13+
14+
agentproto "github.com/coder/coder/v2/agent/proto"
15+
)
16+
17+
const (
18+
defaultBatchSize = 10
19+
defaultBatchTimerDuration = 5 * time.Second
20+
// DefaultAuditSocketPath is the well-known path for the boundary audit socket.
21+
// The expectation is the Coder agent listens on this socket to receive audit logs.
22+
DefaultAuditSocketPath = "/tmp/boundary-audit.sock"
23+
)
24+
25+
// SocketAuditor implements the Auditor interface. It sends logs to the
26+
// workspace agent's boundary log proxy socket. It queues logs and sends
27+
// them in batches using a batch size and timer. The internal queue operates
28+
// as a FIFO i.e., logs are sent in the order they are received and dropped
29+
// if the queue is full.
30+
//
31+
// The proto messages sent to the agent are framed as follows:
32+
// [4 bit tag][28 bit length][length bytes of encoded protobuf]
33+
//
34+
// The tag is currently always 1, but may be extended in the future (e.g.
35+
// to support compression).
36+
type SocketAuditor struct {
37+
socketPath string
38+
logger *slog.Logger
39+
logCh chan *agentproto.BoundaryLog
40+
batchSize int
41+
batchTimerDuration time.Duration
42+
43+
// onFlushAttempt is called after each flush attempt (intended for testing).
44+
onFlushAttempt func()
45+
}
46+
47+
// NewSocketAuditor creates a new SocketAuditor that sends logs to the agent's
48+
// boundary log proxy socket at DefaultAuditSocketPath after SocketAuditor.Loop
49+
// is called.
50+
func NewSocketAuditor(logger *slog.Logger) *SocketAuditor {
51+
return &SocketAuditor{
52+
socketPath: DefaultAuditSocketPath,
53+
logger: logger,
54+
logCh: make(chan *agentproto.BoundaryLog, 2*defaultBatchSize),
55+
batchSize: defaultBatchSize,
56+
batchTimerDuration: defaultBatchTimerDuration,
57+
}
58+
}
59+
60+
// AuditRequest implements the Auditor interface. It queues the log to be sent to the
61+
// agent in a batch.
62+
func (s *SocketAuditor) AuditRequest(req Request) {
63+
httpReq := &agentproto.BoundaryLog_HttpRequest{
64+
Method: req.Method,
65+
Url: req.URL,
66+
}
67+
// Only include the matched rule for denied requests, as documented in
68+
// the proto schema.
69+
if !req.Allowed {
70+
httpReq.MatchedRule = req.Rule
71+
}
72+
73+
log := &agentproto.BoundaryLog{
74+
Allowed: req.Allowed,
75+
Time: timestamppb.Now(),
76+
Resource: &agentproto.BoundaryLog_HttpRequest_{HttpRequest: httpReq},
77+
}
78+
79+
select {
80+
case s.logCh <- log:
81+
default:
82+
s.logger.Warn("audit log dropped, channel full")
83+
}
84+
}
85+
86+
// flushErr represents an error from flush, distinguishing between
87+
// permanent errors (bad data) and transient errors (network issues).
88+
type flushErr struct {
89+
err error
90+
permanent bool
91+
}
92+
93+
func (e *flushErr) Error() string { return e.err.Error() }
94+
95+
// flush sends the current batch of logs to the given connection.
96+
func flush(conn net.Conn, logs []*agentproto.BoundaryLog) *flushErr {
97+
if len(logs) == 0 {
98+
return nil
99+
}
100+
101+
req := &agentproto.ReportBoundaryLogsRequest{
102+
Logs: logs,
103+
}
104+
105+
data, err := proto.Marshal(req)
106+
if err != nil {
107+
return &flushErr{err: err, permanent: true}
108+
}
109+
110+
if len(data) > 1<<28 {
111+
return &flushErr{err: fmt.Errorf("data too large: %d bytes", len(data)), permanent: true}
112+
}
113+
114+
var header uint32
115+
header |= uint32(len(data))
116+
header |= 1 << 28
117+
118+
if err := binary.Write(conn, binary.BigEndian, header); err != nil {
119+
return &flushErr{err: err}
120+
}
121+
if _, err := conn.Write(data); err != nil {
122+
return &flushErr{err: err}
123+
}
124+
return nil
125+
}
126+
127+
// Loop handles the I/O to send audit logs to the agent.
128+
func (s *SocketAuditor) Loop(ctx context.Context) {
129+
var conn net.Conn
130+
batch := make([]*agentproto.BoundaryLog, 0, s.batchSize)
131+
t := time.NewTimer(0)
132+
t.Stop()
133+
134+
// connect attempts to establish a connection to the socket.
135+
connect := func() {
136+
if conn != nil {
137+
return
138+
}
139+
var err error
140+
conn, err = net.Dial("unix", s.socketPath)
141+
if err != nil {
142+
s.logger.Warn("failed to connect to audit socket", "path", s.socketPath, "error", err)
143+
conn = nil
144+
}
145+
}
146+
147+
// closeConn closes the current connection if open.
148+
closeConn := func() {
149+
if conn != nil {
150+
_ = conn.Close()
151+
conn = nil
152+
}
153+
}
154+
155+
// clearBatch resets the length of the batch and frees memory while preserving
156+
// the batch slice backing array.
157+
clearBatch := func() {
158+
for i := range len(batch) {
159+
batch[i] = nil
160+
}
161+
batch = batch[:0]
162+
}
163+
164+
// doFlush flushes the batch and handles errors by reconnecting.
165+
doFlush := func() {
166+
t.Stop()
167+
defer func() {
168+
if s.onFlushAttempt != nil {
169+
s.onFlushAttempt()
170+
}
171+
}()
172+
if len(batch) == 0 {
173+
return
174+
}
175+
connect()
176+
if conn == nil {
177+
// No connection: logs will be retried on next flush.
178+
return
179+
}
180+
181+
if err := flush(conn, batch); err != nil {
182+
s.logger.Warn("failed to flush audit logs", "error", err)
183+
if err.permanent {
184+
// Data error: discard batch to avoid infinite retries.
185+
clearBatch()
186+
} else {
187+
// Network error: close connection but keep batch for a future retry.
188+
closeConn()
189+
}
190+
return
191+
}
192+
193+
clearBatch()
194+
}
195+
196+
connect()
197+
198+
for {
199+
select {
200+
case <-ctx.Done():
201+
// Drain any pending logs before the last flush. Not concerned about
202+
// growing the batch slice here since we're exiting.
203+
drain:
204+
for {
205+
select {
206+
case log := <-s.logCh:
207+
batch = append(batch, log)
208+
default:
209+
break drain
210+
}
211+
}
212+
213+
doFlush()
214+
closeConn()
215+
return
216+
case <-t.C:
217+
doFlush()
218+
case log := <-s.logCh:
219+
// If batch is at capacity, attempt flushing first and drop the log if
220+
// the batch still full.
221+
if len(batch) >= s.batchSize {
222+
doFlush()
223+
if len(batch) >= s.batchSize {
224+
s.logger.Warn("audit log dropped, batch full")
225+
continue
226+
}
227+
}
228+
229+
batch = append(batch, log)
230+
231+
if len(batch) == 1 {
232+
t.Reset(s.batchTimerDuration)
233+
}
234+
235+
if len(batch) >= s.batchSize {
236+
doFlush()
237+
}
238+
}
239+
}
240+
}

0 commit comments

Comments
 (0)