Skip to content

Commit ebe8d76

Browse files
committed
sql/inspect: record table version in INSPECT job to detect schema drift
Previously, the INSPECT job only recorded the table IDs involved in the job. However, there is a time gap between when the job record is created and when the job is executed, during which schema changes amy occur. These changes can invalidate assumptions made at job creation (e.g. index ID existence). This change introduces table version tracking into the job record. Before executing any checks, the job will verify that the current table versions match those stored at creation. If a mismatch is detected, the job will fail early. This mechanism mirrors similar protections implemented in other jobs, such as TTL jobs. Informs #158197 Release note: none
1 parent eb2465b commit ebe8d76

File tree

6 files changed

+134
-14
lines changed

6 files changed

+134
-14
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,6 +1474,14 @@ message InspectDetails {
14741474
(gogoproto.customname) = "IndexID",
14751475
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"
14761476
];
1477+
1478+
// TableVersion records the descriptor version observed when the check was
1479+
// planned. It is used at execution time to detect concurrent schema changes
1480+
// and abort the job before running inconsistent checks.
1481+
uint64 table_version = 4 [
1482+
(gogoproto.customname) = "TableVersion",
1483+
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.DescriptorVersion"
1484+
];
14771485
}
14781486

14791487
// Checks is the list of individual checks this job will perform.

pkg/sql/importer/import_job.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,32 +327,45 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
327327
if err != nil {
328328
return err
329329
}
330-
tblDesc := tabledesc.NewBuilder(table.Desc).BuildImmutableTable()
331-
if len(tblDesc.PublicNonPrimaryIndexes()) > 0 {
332-
checks, err := inspect.ChecksForTable(ctx, nil /* p */, tblDesc)
330+
331+
var checks []*jobspb.InspectDetails_Check
332+
var tableName string
333+
if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(
334+
ctx context.Context, txn descs.Txn,
335+
) error {
336+
// INSPECT requires the latest descriptor. The one cached in the job is
337+
// out of date as it has an old table version.
338+
tblDesc, err := txn.Descriptors().ByIDWithoutLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, table.Desc.ID)
333339
if err != nil {
334340
return err
335341
}
342+
tableName = tblDesc.GetName()
343+
checks, err = inspect.ChecksForTable(ctx, nil /* p */, tblDesc)
344+
return err
345+
}); err != nil {
346+
return err
347+
}
336348

349+
if len(checks) > 0 {
337350
job, err := inspect.TriggerJob(
338351
ctx,
339-
fmt.Sprintf("import-validation-%s", tblDesc.GetName()),
352+
fmt.Sprintf("import-validation-%s", tableName),
340353
p.ExecCfg(),
341354
nil, /* txn */
342355
checks,
343356
setPublicTimestamp,
344357
)
345358
if err != nil {
346-
return errors.Wrapf(err, "failed to trigger inspect for import validation for table %s", tblDesc.GetName())
359+
return errors.Wrapf(err, "failed to trigger inspect for import validation for table %s", tableName)
347360
}
348-
log.Eventf(ctx, "triggered inspect job %d for import validation for table %s with AOST %s", job.ID(), tblDesc.GetName(), setPublicTimestamp)
361+
log.Eventf(ctx, "triggered inspect job %d for import validation for table %s with AOST %s", job.ID(), tableName, setPublicTimestamp)
349362

350363
// For sync mode, wait for the inspect job to complete.
351364
if validationMode == ImportRowCountValidationSync {
352365
if err := p.ExecCfg().JobRegistry.WaitForJobs(ctx, []jobspb.JobID{job.ID()}); err != nil {
353-
return errors.Wrapf(err, "failed to wait for inspect job %d for table %s", job.ID(), tblDesc.GetName())
366+
return errors.Wrapf(err, "failed to wait for inspect job %d for table %s", job.ID(), tableName)
354367
}
355-
log.Eventf(ctx, "inspect job %d completed for table %s", job.ID(), tblDesc.GetName())
368+
log.Eventf(ctx, "inspect job %d completed for table %s", job.ID(), tableName)
356369
}
357370
}
358371
}

pkg/sql/inspect/index_consistency_check.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"fmt"
1212
"strings"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/keys"
1516
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -80,7 +81,10 @@ type indexConsistencyCheck struct {
8081

8182
flowCtx *execinfra.FlowCtx
8283
indexID descpb.IndexID
83-
asOf hlc.Timestamp
84+
// tableVersion is the descriptor version recorded when the check was planned.
85+
// It is used to detect concurrent schema changes for non-AS OF inspections.
86+
tableVersion descpb.DescriptorVersion
87+
asOf hlc.Timestamp
8488

8589
tableDesc catalog.TableDescriptor
8690
secIndex catalog.Index
@@ -448,6 +452,17 @@ func (c *indexConsistencyCheck) loadCatalogInfo(ctx context.Context) error {
448452
if err != nil {
449453
return err
450454
}
455+
if c.tableVersion != 0 && c.tableDesc.GetVersion() != c.tableVersion {
456+
return errors.WithHintf(
457+
errors.Newf(
458+
"table %s [%d] has had a schema change since the job has started at %s",
459+
c.tableDesc.GetName(),
460+
c.tableDesc.GetID(),
461+
c.tableDesc.GetModificationTime().GoTime().Format(time.RFC3339),
462+
),
463+
"use AS OF SYSTEM TIME to avoid schema changes during inspection",
464+
)
465+
}
451466

452467
c.priIndex = c.tableDesc.GetPrimaryIndex()
453468

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package inspect
77

88
import (
99
"context"
10+
gosql "database/sql"
1011
"fmt"
1112
"testing"
1213
"time"
@@ -766,3 +767,74 @@ func TestMissingIndexEntryWithHistoricalQuery(t *testing.T) {
766767
require.Contains(t, err.Error(), "INSPECT found inconsistencies",
767768
"INSPECT should detect the missing index entry that existed at the historical timestamp")
768769
}
770+
771+
func TestInspectWithoutASOFSchemaChange(t *testing.T) {
772+
defer leaktest.AfterTest(t)()
773+
defer log.Scope(t).Close(t)
774+
775+
ctx := context.Background()
776+
unblockCh := make(chan struct{})
777+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
778+
Knobs: base.TestingKnobs{
779+
Inspect: &sql.InspectTestingKnobs{
780+
OnInspectJobStart: func() error {
781+
// Block the INSPECT job from starting until the DROP INDEX completes.
782+
const maxWait = 30 * time.Second
783+
select {
784+
case <-unblockCh:
785+
return nil
786+
case <-time.After(maxWait):
787+
return errors.New("timed out waiting for DROP INDEX to complete")
788+
}
789+
},
790+
},
791+
},
792+
})
793+
defer s.Stopper().Stop(ctx)
794+
r := sqlutils.MakeSQLRunner(db)
795+
796+
r.ExecMultiple(t,
797+
`SET CLUSTER SETTING jobs.registry.interval.adopt = '500ms'`,
798+
`SET enable_inspect_command = true`,
799+
`CREATE DATABASE t`,
800+
`CREATE TABLE t.pk_swap (
801+
old_pk INT PRIMARY KEY,
802+
new_pk INT UNIQUE NOT NULL,
803+
payload INT NOT NULL,
804+
INDEX payload_idx (payload)
805+
)`,
806+
`INSERT INTO t.pk_swap SELECT i, i+100, i+200 FROM generate_series(1, 5) AS g(i)`,
807+
)
808+
809+
r.Exec(t, `INSPECT TABLE t.pk_swap WITH OPTIONS INDEX ALL, DETACHED`)
810+
r.Exec(t, `DROP INDEX t.pk_swap@payload_idx`)
811+
// Unblock the INSPECT job now that the DROP INDEX has completed.
812+
close(unblockCh)
813+
814+
var jobID int64
815+
require.Eventually(t, func() bool {
816+
row := db.QueryRow(`SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC LIMIT 1`)
817+
return row.Scan(&jobID) == nil
818+
}, 10*time.Second, 100*time.Millisecond)
819+
820+
var (
821+
status string
822+
jobErr gosql.NullString
823+
)
824+
require.Eventually(t, func() bool {
825+
row := db.QueryRow(`SELECT status, error FROM [SHOW JOBS] WHERE job_id = $1`, jobID)
826+
if err := row.Scan(&status, &jobErr); err != nil {
827+
t.Logf("error polling job status: %v", err)
828+
return false
829+
}
830+
return status != "running" && status != "pending"
831+
}, 60*time.Second, time.Second)
832+
833+
require.Equal(t, "failed", status, "detached job should fail after schema change")
834+
require.Regexp(t, `table pk_swap \[\d+\] has had a schema change since the job has started at .+\nHINT: use AS OF SYSTEM TIME to avoid schema changes during inspection`, jobErr.String)
835+
836+
rows, err := db.Query(`SHOW INSPECT ERRORS FOR TABLE t.pk_swap WITH DETAILS`)
837+
require.NoError(t, err)
838+
defer rows.Close()
839+
require.False(t, rows.Next(), "no inspect errors are recorded when the job aborts before running checks")
840+
}

pkg/sql/inspect/inspect_job_entry.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ func ChecksForTable(
148148
}
149149
continue
150150
}
151-
check := jobspb.InspectDetails_Check{Type: jobspb.InspectCheckIndexConsistency, TableID: table.GetID(), IndexID: index.GetID()}
151+
check := jobspb.InspectDetails_Check{
152+
Type: jobspb.InspectCheckIndexConsistency,
153+
TableID: table.GetID(),
154+
IndexID: index.GetID(),
155+
TableVersion: table.GetVersion(),
156+
}
152157
checks = append(checks, &check)
153158
}
154159

@@ -184,7 +189,12 @@ func checksByIndexNames(
184189
return nil, pgerror.Newf(pgcode.InvalidName, "index %q on table %q is not supported for index consistency checking", index.GetName(), table.GetName())
185190
}
186191

187-
checks = append(checks, &jobspb.InspectDetails_Check{Type: jobspb.InspectCheckIndexConsistency, TableID: table.GetID(), IndexID: index.GetID()})
192+
checks = append(checks, &jobspb.InspectDetails_Check{
193+
Type: jobspb.InspectCheckIndexConsistency,
194+
TableID: table.GetID(),
195+
IndexID: index.GetID(),
196+
TableVersion: table.GetVersion(),
197+
})
188198
}
189199

190200
return checks, nil

pkg/sql/inspect/inspect_processor.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,16 +452,18 @@ func buildInspectCheckFactories(
452452
for _, specCheck := range spec.InspectDetails.Checks {
453453
tableID := specCheck.TableID
454454
indexID := specCheck.IndexID
455+
tableVersion := specCheck.TableVersion
455456
switch specCheck.Type {
456457
case jobspb.InspectCheckIndexConsistency:
457458
checkFactories = append(checkFactories, func(asOf hlc.Timestamp) inspectCheck {
458459
return &indexConsistencyCheck{
459460
indexConsistencyCheckApplicability: indexConsistencyCheckApplicability{
460461
tableID: tableID,
461462
},
462-
flowCtx: flowCtx,
463-
indexID: indexID,
464-
asOf: asOf,
463+
flowCtx: flowCtx,
464+
indexID: indexID,
465+
tableVersion: tableVersion,
466+
asOf: asOf,
465467
}
466468
})
467469

0 commit comments

Comments
 (0)