@@ -108,6 +108,11 @@ var LockedLeaseTimestamp = settings.RegisterBoolSetting(settings.ApplicationLeve
108108 "descriptors used can be intentionally older to support this" ,
109109 false )
110110
111+ var MaxBatchLeaseCount = settings .RegisterIntSetting (settings .ApplicationLevel ,
112+ "sql.catalog.descriptor_lease.max_batch_lease_count" ,
113+ "the maximum number of descriptors to lease in a single batch" ,
114+ 1000 )
115+
111116// WaitForNoVersion returns once there are no unexpired leases left
112117// for any version of the descriptor.
113118func (m * Manager ) WaitForNoVersion (
@@ -1009,7 +1014,9 @@ func (m *Manager) insertDescriptorVersions(
10091014// get needs to see some descriptor updates that we know happened recently.
10101015func (m * Manager ) AcquireFreshestFromStore (ctx context.Context , id descpb.ID ) error {
10111016 // Create descriptorState if needed.
1012- _ = m .findDescriptorState (id , true /* create */ )
1017+ state := m .findDescriptorState (id , true /* create */ )
1018+ state .markAcquisitionStart (ctx )
1019+ defer state .markAcquisitionDone (ctx )
10131020 // We need to acquire a lease on a "fresh" descriptor, meaning that joining
10141021 // a potential in-progress lease acquisition is generally not good enough.
10151022 // If we are to join an in-progress acquisition, it needs to be an acquisition
@@ -1045,13 +1052,159 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
10451052 return nil
10461053}
10471054
1055+ // upsertDescriptorIntoState inserts a descriptor into the descriptor state.
1056+ func (m * Manager ) upsertDescriptorIntoState (
1057+ ctx context.Context , id descpb.ID , session sqlliveness.Session , desc catalog.Descriptor ,
1058+ ) error {
1059+ t := m .findDescriptorState (id , false /* create */ )
1060+ if t == nil {
1061+ return errors .AssertionFailedf ("could not find descriptor state for id %d" , id )
1062+ }
1063+ t .mu .Lock ()
1064+ t .mu .takenOffline = false
1065+ defer t .mu .Unlock ()
1066+ err := t .upsertLeaseLocked (ctx , desc , session , m .storage .getRegionPrefix ())
1067+ if err != nil {
1068+ return err
1069+ }
1070+ return nil
1071+ }
1072+
1073+ // maybePrepareDescriptorForBulkAcquisition sets up the descriptor state for a bulk
1074+ // acquisition. Returns true if the descriptor has been setup for bulk acquistion,
1075+ // and false if an acqusition is already fetching it.
1076+ func (m * Manager ) maybePrepareDescriptorForBulkAcquisition (
1077+ id descpb.ID , acquisitionCh chan struct {},
1078+ ) bool {
1079+ state := m .findDescriptorState (id , true )
1080+ state .mu .Lock ()
1081+ defer state .mu .Unlock ()
1082+ // Only work on descriptors that haven't been acquired or have no
1083+ // acquisition in progress.
1084+ if (state .mu .takenOffline || len (state .mu .active .data ) > 0 ) ||
1085+ state .mu .acquisitionsInProgress > 0 {
1086+ return false
1087+ }
1088+ // Mark the descriptor as having an acquisition in progress, any normal
1089+ // acquisition will wait for the bulk acquisition to finish.
1090+ state .mu .acquisitionsInProgress ++
1091+ state .mu .acquisitionChannel = acquisitionCh
1092+ return true
1093+ }
1094+
1095+ // completeBulkAcquisition clears up the descriptor state after a bulk acquisition.
1096+ func (m * Manager ) completeBulkAcquisition (id descpb.ID ) {
1097+ state := m .findDescriptorState (id , false )
1098+ if state == nil {
1099+ return
1100+ }
1101+ state .mu .Lock ()
1102+ defer state .mu .Unlock ()
1103+ state .mu .acquisitionChannel = nil
1104+ state .mu .acquisitionsInProgress -= 1
1105+ }
1106+
1107+ // EnsureBatch ensures that a set of IDs are already cached by the lease manager
1108+ // by doing a bulk acquisition. The acquisition will be done on a best effort
1109+ // basis, where missing descriptors, dropped, adding, or validation errors will
1110+ // be ignored.
1111+ func (m * Manager ) EnsureBatch (ctx context.Context , ids []descpb.ID ) error {
1112+ maxBatchSize := MaxBatchLeaseCount .Get (& m .storage .settings .SV )
1113+ idsToFetch := make ([]descpb.ID , 0 , min (len (ids ), int (maxBatchSize )))
1114+ lastVersion := make ([]descpb.DescriptorVersion , cap (idsToFetch ))
1115+ lastSession := make ([]sqlliveness.SessionID , cap (idsToFetch ))
1116+ for len (ids ) > 0 {
1117+ processNextBatch := func () error {
1118+ // Reset for the next batch.
1119+ idsToFetch = idsToFetch [:0 ]
1120+ acquisitionCh := make (chan struct {})
1121+ defer close (acquisitionCh )
1122+
1123+ // Figure out which IDs have no state object allocated.
1124+ batchCompleted := true
1125+ for idx , id := range ids {
1126+ if ! m .maybePrepareDescriptorForBulkAcquisition (id , acquisitionCh ) {
1127+ continue
1128+ }
1129+ idsToFetch = append (idsToFetch , id )
1130+ if len (idsToFetch ) == int (maxBatchSize ) {
1131+ ids = ids [idx + 1 :]
1132+ batchCompleted = false
1133+ break
1134+ }
1135+ }
1136+ // All of the entries in the slice have been processed.
1137+ if batchCompleted {
1138+ ids = nil
1139+ }
1140+ // Nothing needs to be fetched everything is cached in memory.
1141+ if len (idsToFetch ) == 0 {
1142+ return nil
1143+ }
1144+ // Clean up the acquisition state after-wards.
1145+ defer func () {
1146+ for _ , id := range idsToFetch {
1147+ m .completeBulkAcquisition (id )
1148+ }
1149+ }()
1150+ // Initial acquisition so version and session ID are blank for any
1151+ // previous version.
1152+ lastVersion = lastVersion [:len (idsToFetch )]
1153+ lastSession = lastSession [:len (idsToFetch )]
1154+ // Execute a batch acquisition at the storage layer.
1155+ session , err := m .storage .livenessProvider .Session (ctx )
1156+ if err != nil {
1157+ return errors .Wrapf (err , "lease acquisition was unable to resolve liveness session" )
1158+ }
1159+ batch , _ , err := m .storage .acquireBatch (ctx , session , idsToFetch , lastVersion , lastSession )
1160+ if err != nil {
1161+ return err
1162+ }
1163+ // Upsert the descriptors into the descriptor state.
1164+ for idx , id := range idsToFetch {
1165+ // If any descriptors in the batch have failed to acquire, we are going
1166+ // to skip them. This can be due to validation error, being dropped, or
1167+ // being added, or missing. The caller can surface these when it attempts
1168+ // to access these leases, and we will attempt acquisition again.
1169+ if batch .errs [idx ] != nil {
1170+ continue
1171+ }
1172+ if err := m .upsertDescriptorIntoState (ctx , id , session , batch .descs [idx ]); err != nil {
1173+ return err
1174+ }
1175+ }
1176+ return nil
1177+ }
1178+ if err := processNextBatch (); err != nil {
1179+ return err
1180+ }
1181+ }
1182+ return nil
1183+ }
1184+
10481185// If the lease cannot be obtained because the descriptor is in the process of
10491186// being dropped or offline, the error will be of type inactiveTableError.
10501187// The boolean returned is true if this call was actually responsible for the
1051- // lease acquisition.
1188+ // lease acquisition. Any callers must allocate the descriptor state and mark
1189+ // that an acquisition is in progress with descriptorState.markAcquisitionStart,
1190+ // before invoking this method.
10521191func (m * Manager ) acquireNodeLease (
10531192 ctx context.Context , id descpb.ID , typ AcquireType ,
10541193) (bool , error ) {
1194+ // Precondition: Ensure that the descriptor state is setup and ready for any
1195+ // acquisition.
1196+ if buildutil .CrdbTestBuild {
1197+ state := m .findDescriptorState (id , false )
1198+ if state == nil {
1199+ return false , errors .AssertionFailedf ("descriptor state must be allocated before acquistion of %d" , id )
1200+ }
1201+ state .mu .Lock ()
1202+ acquisition := state .mu .acquisitionsInProgress
1203+ state .mu .Unlock ()
1204+ if acquisition <= 0 {
1205+ return false , errors .AssertionFailedf ("descriptor acquistion must be marked before invoking acquire node lease on %d." , id )
1206+ }
1207+ }
10551208 start := timeutil .Now ()
10561209 log .VEventf (ctx , 2 , "acquiring lease for descriptor %d..." , id )
10571210 future , didAcquire := m .storage .group .DoChan (ctx ,
@@ -1064,6 +1217,25 @@ func (m *Manager) acquireNodeLease(
10641217 if m .IsDraining () {
10651218 return false , errLeaseManagerIsDraining
10661219 }
1220+ waitedForBulkAcquire , err := func () (bool , error ) {
1221+ state := m .findDescriptorState (id , false )
1222+ state .mu .Lock ()
1223+ acquisitionChannel := state .mu .acquisitionChannel
1224+ state .mu .Unlock ()
1225+ if acquisitionChannel == nil {
1226+ return false , nil
1227+ }
1228+ select {
1229+ case <- acquisitionChannel :
1230+ return true , nil
1231+ case <- ctx .Done ():
1232+ return false , ctx .Err ()
1233+ }
1234+ }()
1235+ if waitedForBulkAcquire || err != nil {
1236+ // We didn't do any acquiring and just waited for a bulk operation.
1237+ return false , err
1238+ }
10671239 newest := m .findNewest (id )
10681240 var currentVersion descpb.DescriptorVersion
10691241 var currentSessionID sqlliveness.SessionID
@@ -1086,22 +1258,6 @@ func (m *Manager) acquireNodeLease(
10861258 return desc , nil
10871259 }
10881260
1089- doUpsertion := func (desc catalog.Descriptor ) error {
1090- t := m .findDescriptorState (id , false /* create */ )
1091- if t == nil {
1092- return errors .AssertionFailedf ("could not find descriptor state for id %d" , id )
1093- }
1094- t .mu .Lock ()
1095- t .mu .takenOffline = false
1096- defer t .mu .Unlock ()
1097- err = t .upsertLeaseLocked (ctx , desc , session , m .storage .getRegionPrefix ())
1098- if err != nil {
1099- return err
1100- }
1101-
1102- return nil
1103- }
1104-
11051261 // These tables are special and can have their versions bumped
11061262 // without blocking on other nodes converging to that version.
11071263 if newest != nil && (id == keys .UsersTableID ||
@@ -1117,8 +1273,7 @@ func (m *Manager) acquireNodeLease(
11171273 if err != nil {
11181274 return false , err
11191275 }
1120- err = doUpsertion (desc )
1121- if err != nil {
1276+ if err := m .upsertDescriptorIntoState (ctx , id , session , desc ); err != nil {
11221277 return false , err
11231278 }
11241279
@@ -1139,8 +1294,7 @@ func (m *Manager) acquireNodeLease(
11391294 if desc == nil {
11401295 return true , nil
11411296 }
1142- err = doUpsertion (desc )
1143- if err != nil {
1297+ if err := m .upsertDescriptorIntoState (ctx , id , session , desc ); err != nil {
11441298 return false , err
11451299 }
11461300 }
@@ -2952,6 +3106,10 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, refreshAndPurgeAllDescr
29523106 return
29533107 }
29543108 }
3109+ // Mark that an acquisition is in progress.
3110+ state := m .findDescriptorState (id , false )
3111+ state .markAcquisitionStart (ctx )
3112+ defer state .markAcquisitionDone (ctx )
29553113 if _ , err := m .acquireNodeLease (ctx , id , AcquireBackground ); err != nil {
29563114 log .Dev .Errorf (ctx , "refreshing descriptor: %d lease failed: %s" , id , err )
29573115
0 commit comments