diff --git a/ca/ca.go b/ca/ca.go index d09eb0b..6db09dd 100644 --- a/ca/ca.go +++ b/ca/ca.go @@ -19,7 +19,7 @@ import ( "github.com/bwesterb/mtc/umbilical/frozencas" "github.com/bwesterb/mtc/umbilical/revocation" - "golang.org/x/crypto/cryptobyte" + bolt "go.etcd.io/bbolt" ) type NewOpts struct { @@ -46,9 +46,12 @@ type Handle struct { b internal.Handle // Covered by own lock + revocationChecker *revocation.Checker + queue *bolt.DB // Mutable covered by b.mux + signer mtc.Signer } @@ -90,6 +93,16 @@ func Open(path string) (*Handle, error) { h.b.Params.EvidencePolicy, ) } + + h.queue, err = bolt.Open( + h.queuePath(), + 0644, + &bolt.Options{Timeout: time.Millisecond}, // no waiting on lock + ) + if err != nil { + return nil, fmt.Errorf("bolt.Open(%s): %w", h.queuePath(), err) + } + return &h, nil } @@ -106,6 +119,8 @@ func (h *Handle) Close() error { h.revocationChecker.Close() } + h.queue.Close() + return nil } @@ -115,15 +130,17 @@ func (h *Handle) dropQueue() error { if h.b.Closed { return internal.ErrClosed } - w, err := os.OpenFile(h.queuePath(), os.O_TRUNC, 0o644) - if err != nil { - return fmt.Errorf("truncating queue: %w", err) - } - err = w.Close() - if err != nil { - return fmt.Errorf("closing after truncation: %w", err) - } - return nil + + return h.queue.Update(func(tx *bolt.Tx) error { + if err := tx.DeleteBucket([]byte("queue")); err != nil { + fmt.Errorf("DeleteBucket: %w", err) + } + + if _, err := tx.CreateBucket([]byte("queue")); err != nil { + fmt.Errorf("CreateBucket: %w", err) + } + return nil + }) } func (h *Handle) queueMultiple(ars []mtc.AssertionRequest) error { @@ -245,40 +262,53 @@ func (h *Handle) queueMultiple(ars []mtc.AssertionRequest) error { h.b.Mux.Lock() locked = true - w, err := os.OpenFile(h.queuePath(), os.O_APPEND|os.O_WRONLY, 0o644) - if err != nil { - return fmt.Errorf("opening queue: %w", err) - } - defer w.Close() - bw := bufio.NewWriter(w) + return h.queue.Update(func(tx *bolt.Tx) error { + var key [mtc.HashLen]byte - for i, ar := range ars { - if notAfter[i] != ar.NotAfter { - ar.NotAfter = notAfter[i] - ar.Checksum = nil // Recompute the checksum. + bucket := tx.Bucket([]byte("queue")) + if bucket == nil { + return errors.New("Missing queue bucket") } - buf, err := ar.MarshalBinary() - if err != nil { - return err - } + for i, ar := range ars { + if notAfter[i] != ar.NotAfter { + ar.NotAfter = notAfter[i] + ar.Checksum = nil // Recompute the checksum. + } - var b cryptobyte.Builder - b.AddUint16(uint16(len(buf))) - prefix, _ := b.Bytes() + if err := ar.Assertion.EntryKey(key[:]); err != nil { + return err + } - _, err = bw.Write(prefix) - if err != nil { - return fmt.Errorf("writing to queue: %w", err) - } + buf, err := ar.MarshalBinary() + if err != nil { + return err + } - _, err = bw.Write(buf) - if err != nil { - return fmt.Errorf("writing to queue: %w", err) + // Check if this assertion is already queued + buf2 := bucket.Get(key[:]) + if buf2 != nil { + var ar2 mtc.AssertionRequest + err = ar2.UnmarshalBinary(buf2) + if err != nil { + return fmt.Errorf("parsing already queued %x: %w", key, err) + } + + if ar2.NotAfter.After(ar.NotAfter) { + // Already queued assertion is valid for longer, so let's + // stick with that one. + continue + } + } + + err = bucket.Put(key[:], buf) + if err != nil { + return fmt.Errorf("writing to queue: %w", err) + } } - } - return bw.Flush() + return nil + }) } // Queue multiple assertions for publication. @@ -1035,11 +1065,6 @@ func New(path string, opts NewOpts) (*Handle, error) { return nil, fmt.Errorf("writing %s: %w", h.skPath(), err) } - // Queue - if err := os.WriteFile(h.queuePath(), []byte{}, 0o644); err != nil { - return nil, fmt.Errorf("Writing %s: %w", h.queuePath(), err) - } - // Accepted roots if h.b.Params.EvidencePolicy == mtc.UmbilicalEvidencePolicy { if err := os.WriteFile(h.b.UmbilicalRootsPath(), opts.UmbilicalRootsPEM, 0o644); err != nil { @@ -1047,6 +1072,25 @@ func New(path string, opts NewOpts) (*Handle, error) { } } + // Queue + h.queue, err = bolt.Open( + h.queuePath(), + 0644, + &bolt.Options{Timeout: time.Millisecond}, // no waiting on lock + ) + if err != nil { + return nil, fmt.Errorf("bolt.Open(%s): %w", h.queuePath(), err) + } + + err = h.queue.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucket([]byte("queue")) + return err + }) + if err != nil { + h.queue.Close() + return nil, fmt.Errorf("queue: failed to create bucket: %w", err) + } + unlock = false return &h, nil } @@ -1062,46 +1106,30 @@ func (h *Handle) WalkQueue(f func(mtc.AssertionRequest) error) error { // Same as WalkQueue, but asummes read lock on mux. func (h *Handle) walkQueue(f func(mtc.AssertionRequest) error) error { - r, err := os.OpenFile(h.queuePath(), os.O_RDONLY, 0) - if err != nil { - return fmt.Errorf("Opening queue: %w", err) - } - defer r.Close() + return h.queue.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte("queue")) + if bucket == nil { + return errors.New("Missing queue bucket") + } - br := bufio.NewReader(r) + c := bucket.Cursor() - for { - var ( - prefix [2]byte - aLen uint16 - ar mtc.AssertionRequest - ) - _, err := io.ReadFull(br, prefix[:]) - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("Reading queue: %w", err) - } - s := cryptobyte.String(prefix[:]) - _ = s.ReadUint16(&aLen) + var ar mtc.AssertionRequest - buf := make([]byte, int(aLen)) - _, err = io.ReadFull(br, buf) - if err != nil { - return fmt.Errorf("Reading queue: %w", err) - } + for key, buf := c.First(); key != nil; key, buf = c.Next() { + err := ar.UnmarshalBinary(buf) + if err != nil { + return fmt.Errorf("parsing queued assertion %x: %w", key, err) + } - err = ar.UnmarshalBinary(buf) - if err != nil { - return fmt.Errorf("Parsing queue: %w", err) + err = f(ar) + if err != nil { + return err + } } - err = f(ar) - if err != nil { - return err - } - } + return nil + }) return nil }