Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 103 additions & 75 deletions ca/ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/bwesterb/mtc/umbilical/frozencas"
"github.com/bwesterb/mtc/umbilical/revocation"

"golang.org/x/crypto/cryptobyte"
bolt "go.etcd.io/bbolt"
)

type NewOpts struct {
Expand All @@ -46,9 +46,12 @@ type Handle struct {
b internal.Handle

// Covered by own lock

revocationChecker *revocation.Checker
queue *bolt.DB

// Mutable covered by b.mux

signer mtc.Signer
}

Expand Down Expand Up @@ -90,6 +93,16 @@ func Open(path string) (*Handle, error) {
h.b.Params.EvidencePolicy,
)
}

h.queue, err = bolt.Open(
h.queuePath(),
0644,
&bolt.Options{Timeout: time.Millisecond}, // no waiting on lock
)
if err != nil {
return nil, fmt.Errorf("bolt.Open(%s): %w", h.queuePath(), err)
}

return &h, nil
}

Expand All @@ -106,6 +119,8 @@ func (h *Handle) Close() error {
h.revocationChecker.Close()
}

h.queue.Close()

return nil
}

Expand All @@ -115,15 +130,17 @@ func (h *Handle) dropQueue() error {
if h.b.Closed {
return internal.ErrClosed
}
w, err := os.OpenFile(h.queuePath(), os.O_TRUNC, 0o644)
if err != nil {
return fmt.Errorf("truncating queue: %w", err)
}
err = w.Close()
if err != nil {
return fmt.Errorf("closing after truncation: %w", err)
}
return nil

return h.queue.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket([]byte("queue")); err != nil {
fmt.Errorf("DeleteBucket: %w", err)
}

if _, err := tx.CreateBucket([]byte("queue")); err != nil {
fmt.Errorf("CreateBucket: %w", err)
}
return nil
})
}

func (h *Handle) queueMultiple(ars []mtc.AssertionRequest) error {
Expand Down Expand Up @@ -245,40 +262,53 @@ func (h *Handle) queueMultiple(ars []mtc.AssertionRequest) error {
h.b.Mux.Lock()
locked = true

w, err := os.OpenFile(h.queuePath(), os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("opening queue: %w", err)
}
defer w.Close()
bw := bufio.NewWriter(w)
return h.queue.Update(func(tx *bolt.Tx) error {
var key [mtc.HashLen]byte

for i, ar := range ars {
if notAfter[i] != ar.NotAfter {
ar.NotAfter = notAfter[i]
ar.Checksum = nil // Recompute the checksum.
bucket := tx.Bucket([]byte("queue"))
if bucket == nil {
return errors.New("Missing queue bucket")
}

buf, err := ar.MarshalBinary()
if err != nil {
return err
}
for i, ar := range ars {
if notAfter[i] != ar.NotAfter {
ar.NotAfter = notAfter[i]
ar.Checksum = nil // Recompute the checksum.
}

var b cryptobyte.Builder
b.AddUint16(uint16(len(buf)))
prefix, _ := b.Bytes()
if err := ar.Assertion.EntryKey(key[:]); err != nil {
return err
}

_, err = bw.Write(prefix)
if err != nil {
return fmt.Errorf("writing to queue: %w", err)
}
buf, err := ar.MarshalBinary()
if err != nil {
return err
}

_, err = bw.Write(buf)
if err != nil {
return fmt.Errorf("writing to queue: %w", err)
// Check if this assertion is already queued
buf2 := bucket.Get(key[:])
if buf2 != nil {
var ar2 mtc.AssertionRequest
err = ar2.UnmarshalBinary(buf2)
if err != nil {
return fmt.Errorf("parsing already queued %x: %w", key, err)
}

if ar2.NotAfter.After(ar.NotAfter) {
// Already queued assertion is valid for longer, so let's
// stick with that one.
continue
}
}

err = bucket.Put(key[:], buf)
if err != nil {
return fmt.Errorf("writing to queue: %w", err)
}
}
}

return bw.Flush()
return nil
})
}

// Queue multiple assertions for publication.
Expand Down Expand Up @@ -1035,18 +1065,32 @@ func New(path string, opts NewOpts) (*Handle, error) {
return nil, fmt.Errorf("writing %s: %w", h.skPath(), err)
}

// Queue
if err := os.WriteFile(h.queuePath(), []byte{}, 0o644); err != nil {
return nil, fmt.Errorf("Writing %s: %w", h.queuePath(), err)
}

// Accepted roots
if h.b.Params.EvidencePolicy == mtc.UmbilicalEvidencePolicy {
if err := os.WriteFile(h.b.UmbilicalRootsPath(), opts.UmbilicalRootsPEM, 0o644); err != nil {
return nil, fmt.Errorf("Writing %s: %w", h.b.UmbilicalRootsPath(), err)
}
}

// Queue
h.queue, err = bolt.Open(
h.queuePath(),
0644,
&bolt.Options{Timeout: time.Millisecond}, // no waiting on lock
)
if err != nil {
return nil, fmt.Errorf("bolt.Open(%s): %w", h.queuePath(), err)
}

err = h.queue.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket([]byte("queue"))
return err
})
if err != nil {
h.queue.Close()
return nil, fmt.Errorf("queue: failed to create bucket: %w", err)
}

unlock = false
return &h, nil
}
Expand All @@ -1062,46 +1106,30 @@ func (h *Handle) WalkQueue(f func(mtc.AssertionRequest) error) error {

// Same as WalkQueue, but asummes read lock on mux.
func (h *Handle) walkQueue(f func(mtc.AssertionRequest) error) error {
r, err := os.OpenFile(h.queuePath(), os.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("Opening queue: %w", err)
}
defer r.Close()
return h.queue.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("queue"))
if bucket == nil {
return errors.New("Missing queue bucket")
}

br := bufio.NewReader(r)
c := bucket.Cursor()

for {
var (
prefix [2]byte
aLen uint16
ar mtc.AssertionRequest
)
_, err := io.ReadFull(br, prefix[:])
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("Reading queue: %w", err)
}
s := cryptobyte.String(prefix[:])
_ = s.ReadUint16(&aLen)
var ar mtc.AssertionRequest

buf := make([]byte, int(aLen))
_, err = io.ReadFull(br, buf)
if err != nil {
return fmt.Errorf("Reading queue: %w", err)
}
for key, buf := c.First(); key != nil; key, buf = c.Next() {
err := ar.UnmarshalBinary(buf)
if err != nil {
return fmt.Errorf("parsing queued assertion %x: %w", key, err)
}

err = ar.UnmarshalBinary(buf)
if err != nil {
return fmt.Errorf("Parsing queue: %w", err)
err = f(ar)
if err != nil {
return err
}
}

err = f(ar)
if err != nil {
return err
}
}
return nil
})

return nil
}
Expand Down