Skip to content

Commit 012ff5b

Browse files
committed
catalog/lease: refactor lease manager storage for bulk API
Previously, the lease manager acquired descriptors one at a time. This caused performance bottlenecks when processing large numbers of descriptors. This change adds `acquireBatch` to the lease manager storage layer to allow optimized lease acquisition for multiple objects. The existing `acquire` method now invokes the new method. Informs: #158086 Release note: None
1 parent 44a50a0 commit 012ff5b

File tree

4 files changed

+158
-68
lines changed

4 files changed

+158
-68
lines changed

pkg/sql/catalog/lease/ie_writer_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,13 @@ func (w *ieWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields)
6161
return nil
6262
}
6363

64+
func (w *ieWriter) insertLeases(ctx context.Context, txn *kv.Txn, leases []leaseFields) error {
65+
for _, l := range leases {
66+
if err := w.insertLease(ctx, txn, l); err != nil {
67+
return err
68+
}
69+
}
70+
return nil
71+
}
72+
6473
var _ writer = (*ieWriter)(nil)

pkg/sql/catalog/lease/kv_writer.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,29 @@ func leaseTableWithID(id descpb.ID, table systemschema.SystemTable) catalog.Tabl
5252
return mut.ImmutableCopy().(catalog.TableDescriptor)
5353
}
5454

55-
func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
56-
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
57-
if l.sessionID != nil {
58-
err := w.sessionBasedWriter.Insert(ctx, b, false /*kvTrace*/, leaseAsSessionBasedDatum(l)...)
59-
if err != nil {
60-
return err
55+
func (w *kvWriter) insertLeases(ctx context.Context, txn *kv.Txn, leases []leaseFields) error {
56+
if err := w.do(ctx, txn, leases, func(b *kv.Batch) error {
57+
for _, l := range leases {
58+
if l.sessionID != nil {
59+
err := w.sessionBasedWriter.Insert(ctx, b, false /*kvTrace*/, leaseAsSessionBasedDatum(l)...)
60+
if err != nil {
61+
return err
62+
}
6163
}
6264
}
6365
return nil
6466
}); err != nil {
65-
return errors.Wrapf(err, "failed to insert lease %v", l)
67+
return errors.Wrapf(err, "failed to insert lease %v", leases)
6668
}
6769
return nil
6870
}
6971

72+
func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
73+
return w.insertLeases(ctx, txn, []leaseFields{l})
74+
}
75+
7076
func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
71-
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
77+
if err := w.do(ctx, txn, []leaseFields{l}, func(b *kv.Batch) error {
7278
if l.sessionID != nil {
7379
err := w.sessionBasedWriter.Delete(ctx, b, false /*kvTrace*/, leaseAsSessionBasedDatum(l)...)
7480
if err != nil {
@@ -85,7 +91,7 @@ func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields)
8591
type addToBatchFunc = func(*kv.Batch) error
8692

8793
func (w *kvWriter) do(
88-
ctx context.Context, txn *kv.Txn, lease leaseFields, addToBatch addToBatchFunc,
94+
ctx context.Context, txn *kv.Txn, lease []leaseFields, addToBatch addToBatchFunc,
8995
) error {
9096
run := (*kv.Txn).Run
9197
do := func(ctx context.Context, txn *kv.Txn) error {

pkg/sql/catalog/lease/kv_writer_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,4 +282,11 @@ func (t teeWriter) insertLease(ctx context.Context, txn *kv.Txn, fields leaseFie
282282
)
283283
}
284284

285+
func (t teeWriter) insertLeases(ctx context.Context, txn *kv.Txn, leases []leaseFields) error {
286+
return errors.CombineErrors(
287+
t.a.insertLeases(ctx, txn, leases),
288+
t.b.insertLeases(ctx, txn, leases),
289+
)
290+
}
291+
285292
var _ writer = (*teeWriter)(nil)

pkg/sql/catalog/lease/storage.go

Lines changed: 127 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type leaseFields struct {
8888
type writer interface {
8989
deleteLease(context.Context, *kv.Txn, leaseFields) error
9090
insertLease(context.Context, *kv.Txn, leaseFields) error
91+
insertLeases(context.Context, *kv.Txn, []leaseFields) error
9192
}
9293

9394
// LeaseRenewalDuration controls the default time before a lease expires when
@@ -122,23 +123,24 @@ func (s storage) crossValidateDuringRenewal() bool {
122123
return LeaseRenewalCrossValidate.Get(&s.settings.SV)
123124
}
124125

125-
// acquire a lease on the most recent version of a descriptor. The lease is tied
126-
// to the provided sqlliveness.Session. If a newer version (then lastVersion) of
127-
// the descriptor exists, this function will attempt to acquire a lease on it.
128-
// If no newer version exists, it returns a nil descriptor. If the lease cannot
129-
// be obtained because the descriptor is being dropped or is offline (currently
130-
// only applicable to tables), an inactiveTableError is returned.
131-
func (s storage) acquire(
126+
// acquireBatchResult is the result of a batch acquire operation.
127+
type acquireBatchResult struct {
128+
descs []catalog.Descriptor
129+
errs []error
130+
}
131+
132+
func (s storage) acquireBatch(
132133
ctx context.Context,
133134
session sqlliveness.Session,
134-
id descpb.ID,
135-
lastVersion descpb.DescriptorVersion,
136-
lastSessionID sqlliveness.SessionID,
137-
) (desc catalog.Descriptor, prefix []byte, _ error) {
135+
ids []descpb.ID,
136+
lastVersions []descpb.DescriptorVersion,
137+
lastSessionIDs []sqlliveness.SessionID,
138+
) (result acquireBatchResult, prefix []byte, err error) {
138139
ctx = multitenant.WithTenantCostControlExemption(ctx)
139140
prefix = s.getRegionPrefix()
140141
var sessionID []byte
141142
acquireInTxn := func(ctx context.Context, txn *kv.Txn) (err error) {
143+
result.errs = make([]error, len(ids))
142144
// Run the descriptor read as high-priority, thereby pushing any intents out
143145
// of its way. We don't want schema changes to prevent lease acquisitions;
144146
// we'd rather force them to refresh. Also this prevents deadlocks in cases
@@ -158,51 +160,80 @@ func (s storage) acquire(
158160
// Note that the expiration is part of the primary key in the table, so we
159161
// would not overwrite the old entry if we just were to do another insert.
160162
// repeatIteration = desc != nil
161-
if sessionID != nil && desc != nil {
162-
if err := s.writer.deleteLease(ctx, txn, leaseFields{
163-
regionPrefix: prefix,
164-
descID: desc.GetID(),
165-
version: desc.GetVersion(),
166-
instanceID: instanceID,
167-
sessionID: sessionID,
168-
}); err != nil {
169-
return errors.Wrap(err, "deleting ambiguously created lease")
163+
if sessionID != nil && result.descs != nil {
164+
for _, desc := range result.descs {
165+
// Skip descriptors that had no leases inserted.
166+
if desc == nil {
167+
continue
168+
}
169+
if err := s.writer.deleteLease(ctx, txn, leaseFields{
170+
regionPrefix: prefix,
171+
descID: desc.GetID(),
172+
version: desc.GetVersion(),
173+
instanceID: instanceID,
174+
sessionID: sessionID,
175+
}); err != nil {
176+
return errors.Wrap(err, "deleting ambiguously created lease")
177+
}
170178
}
171179
}
172180

173181
// Read into a temporary variable in case our read runs into
174182
// any retryable error. If we run into an error then the delete
175183
// above may need to be executed again.
176-
latestDesc, err := s.mustGetDescriptorByID(ctx, txn, id)
177-
184+
const isDescriptorRequired = false
185+
latestDescs, err := s.mustGetDescriptorByIDs(ctx, txn, ids, isDescriptorRequired)
178186
if err != nil {
179187
return err
180188
}
189+
numDescriptorsToInsert := 0
181190
// If the descriptor version hasn't changed, then no new lease to be
182191
// inserted unless the session ID has changed on us. No descriptor will
183192
// be set indicating to the caller that no new version exists.
184-
if latestDesc.GetVersion() == lastVersion &&
185-
lastSessionID == session.ID() {
186-
return nil
187-
}
188-
desc = latestDesc
189-
if err := catalog.FilterAddingDescriptor(desc); err != nil {
190-
return err
193+
for idx, latestDesc := range latestDescs {
194+
if latestDesc == nil {
195+
// The descriptor was not found.
196+
result.errs[idx] = catalog.NewDescriptorNotFoundError(ids[idx])
197+
continue
198+
}
199+
if lastVersions[idx] == latestDesc.GetVersion() &&
200+
lastSessionIDs[idx] == session.ID() {
201+
latestDescs[idx] = nil
202+
continue
203+
}
204+
if err := catalog.FilterAddingDescriptor(latestDesc); err != nil {
205+
result.errs[idx] = err
206+
latestDescs[idx] = nil
207+
continue
208+
}
209+
if err := catalog.FilterDroppedDescriptor(latestDesc); err != nil {
210+
result.errs[idx] = err
211+
latestDescs[idx] = nil
212+
continue
213+
}
214+
numDescriptorsToInsert += 1
191215
}
192-
if err := catalog.FilterDroppedDescriptor(desc); err != nil {
193-
return err
216+
result.descs = latestDescs
217+
if numDescriptorsToInsert == 0 {
218+
return nil
194219
}
195-
log.VEventf(ctx, 2, "storage attempting to acquire lease %v", desc)
196-
197220
sessionID = session.ID().UnsafeBytes()
198-
lf := leaseFields{
199-
regionPrefix: prefix,
200-
descID: desc.GetID(),
201-
version: desc.GetVersion(),
202-
instanceID: s.nodeIDContainer.SQLInstanceID(),
203-
sessionID: sessionID,
221+
leasesToInsert := make([]leaseFields, 0, numDescriptorsToInsert)
222+
for _, desc := range latestDescs {
223+
if desc == nil {
224+
continue
225+
}
226+
lf := leaseFields{
227+
regionPrefix: prefix,
228+
descID: desc.GetID(),
229+
version: desc.GetVersion(),
230+
instanceID: s.nodeIDContainer.SQLInstanceID(),
231+
sessionID: sessionID,
232+
}
233+
leasesToInsert = append(leasesToInsert, lf)
234+
log.VEventf(ctx, 2, "storage attempting to acquire lease %v", desc)
204235
}
205-
return s.writer.insertLease(ctx, txn, lf)
236+
return s.writer.insertLeases(ctx, txn, leasesToInsert)
206237
}
207238

208239
// Compute the maximum time we will retry ambiguous replica errors before
@@ -238,28 +269,49 @@ func (s storage) acquire(
238269
s.livenessProvider.PauseLivenessHeartbeat(ctx)
239270
extensionsBlocked = true
240271
}
241-
log.Dev.Infof(ctx, "retryable replica error occurred during lease acquisition for %v, retrying: %v", id, err)
272+
log.Dev.Infof(ctx, "retryable replica error occurred during lease acquisition for %v, retrying: %v", ids, err)
242273
continue
243274
case pgerror.GetPGCode(err) == pgcode.UniqueViolation:
244275
log.Dev.Infof(ctx, "uniqueness violation occurred due to concurrent lease"+
245-
" removal for %v, retrying: %v", id, err)
276+
" removal for %v, retrying: %v", ids, err)
246277
continue
247278
case err != nil:
248-
return nil, nil, err
279+
return acquireBatchResult{}, nil, err
249280
}
250-
// If desc is nil then no new descriptor was available to be leased.
251-
// i.e. the last version we leased is the latest and still held
252-
if desc == nil {
253-
return nil, nil, nil
254-
}
255-
log.VEventf(ctx, 2, "storage acquired lease %v", desc)
256-
if s.testingKnobs.LeaseAcquiredEvent != nil {
257-
s.testingKnobs.LeaseAcquiredEvent(desc, err)
281+
for _, desc := range result.descs {
282+
// No lease is needed for this descriptor.
283+
if desc == nil {
284+
continue
285+
}
286+
log.VEventf(ctx, 2, "storage acquired lease %v", desc)
287+
if s.testingKnobs.LeaseAcquiredEvent != nil {
288+
s.testingKnobs.LeaseAcquiredEvent(desc, err)
289+
}
290+
s.outstandingLeases.Inc(1)
258291
}
259-
s.outstandingLeases.Inc(1)
260-
return desc, prefix, nil
292+
return result, prefix, nil
293+
}
294+
return acquireBatchResult{}, nil, ctx.Err()
295+
}
296+
297+
// acquire a lease on the most recent version of a descriptor. The lease is tied
298+
// to the provided sqlliveness.Session. If a newer version (then lastVersion) of
299+
// the descriptor exists, this function will attempt to acquire a lease on it.
300+
// If no newer version exists, it returns a nil descriptor. If the lease cannot
301+
// be obtained because the descriptor is being dropped or is offline (currently
302+
// only applicable to tables), an inactiveTableError is returned.
303+
func (s storage) acquire(
304+
ctx context.Context,
305+
session sqlliveness.Session,
306+
id descpb.ID,
307+
lastVersion descpb.DescriptorVersion,
308+
lastSessionID sqlliveness.SessionID,
309+
) (desc catalog.Descriptor, prefix []byte, _ error) {
310+
batchResult, prefix, err := s.acquireBatch(ctx, session, []descpb.ID{id}, []descpb.DescriptorVersion{lastVersion}, []sqlliveness.SessionID{lastSessionID})
311+
if err != nil {
312+
return nil, nil, err
261313
}
262-
return nil, nil, ctx.Err()
314+
return batchResult.descs[0], prefix, batchResult.errs[0]
263315
}
264316

265317
// Release a previously acquired descriptor. Never let this method
@@ -383,13 +435,29 @@ func (s storage) newCatalogReader(ctx context.Context) catkv.CatalogReader {
383435
func (s storage) mustGetDescriptorByID(
384436
ctx context.Context, txn *kv.Txn, id descpb.ID,
385437
) (catalog.Descriptor, error) {
386-
cr := s.newCatalogReader(ctx)
387438
const isDescriptorRequired = true
388-
c, err := cr.GetByIDs(ctx, txn, []descpb.ID{id}, isDescriptorRequired, catalog.Any)
439+
descs, err := s.mustGetDescriptorByIDs(ctx, txn, []descpb.ID{id}, isDescriptorRequired)
440+
if err != nil {
441+
return nil, err
442+
}
443+
return descs[0], nil
444+
}
445+
446+
// mustGetDescriptorByIDs returns the descriptors for the given ids for
447+
// batch operations.
448+
func (s storage) mustGetDescriptorByIDs(
449+
ctx context.Context, txn *kv.Txn, ids []descpb.ID, isDescriptorRequired bool,
450+
) ([]catalog.Descriptor, error) {
451+
descs := make([]catalog.Descriptor, 0, len(ids))
452+
cr := s.newCatalogReader(ctx)
453+
c, err := cr.GetByIDs(ctx, txn, ids, isDescriptorRequired, catalog.Any, catkv.WithDescriptor(true))
389454
if err != nil {
390455
return nil, err
391456
}
392-
desc := c.LookupDescriptor(id)
457+
for _, id := range ids {
458+
desc := c.LookupDescriptor(id)
459+
descs = append(descs, desc)
460+
}
393461
validationLevel := catalog.ValidationLevelSelfOnly
394462
if s.crossValidateDuringRenewal() {
395463
validationLevel = validate.ImmutableRead
@@ -401,12 +469,12 @@ func (s storage) mustGetDescriptorByID(
401469
vd,
402470
catalog.ValidationReadTelemetry,
403471
validationLevel,
404-
desc,
472+
descs...,
405473
)
406474
if err := ve.CombinedError(); err != nil {
407475
return nil, err
408476
}
409-
return desc, nil
477+
return descs, nil
410478
}
411479

412480
func (s storage) getRegionPrefix() []byte {

0 commit comments

Comments
 (0)