Skip to content

Commit ce103d7

Browse files
craig[bot]spilchen
andcommitted
Merge #158199
158199: sql/inspect: use historical descriptors for INSPECT AS OF r=spilchen a=spilchen Fix a bug in `INSPECT ... AS OF` that incorrectly used the latest table descriptor instead of a historical one. This could result in incorrect index ID usage if a schema change occurred after the specified AS OF time. Since this tests schema changes interleaved with INSPECT, this will also create/start the INSPECT job in a separate transaction. This was needed to avoid getting transaction retry errors. Informs: #158197 Release note: none Co-authored-by: Matt Spilchen <matt.spilchen@cockroachlabs.com>
2 parents b7d1a56 + 8425d41 commit ce103d7

File tree

5 files changed

+114
-51
lines changed

5 files changed

+114
-51
lines changed

pkg/sql/importer/import_job.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,6 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
351351
ctx,
352352
fmt.Sprintf("import-validation-%s", tableName),
353353
p.ExecCfg(),
354-
nil, /* txn */
355354
checks,
356355
setPublicTimestamp,
357356
)

pkg/sql/inspect/index_consistency_check.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,19 @@ func (c *indexConsistencyCheck) Close(context.Context) error {
447447
// descriptor and index metadata in the indexConsistencyCheck struct.
448448
func (c *indexConsistencyCheck) loadCatalogInfo(ctx context.Context) error {
449449
return c.flowCtx.Cfg.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
450+
if !c.asOf.IsEmpty() {
451+
if err := txn.KV().SetFixedTimestamp(ctx, c.asOf); err != nil {
452+
return err
453+
}
454+
}
455+
456+
byIDGetter := txn.Descriptors().ByIDWithLeased(txn.KV())
457+
if !c.asOf.IsEmpty() {
458+
byIDGetter = txn.Descriptors().ByIDWithoutLeased(txn.KV())
459+
}
460+
450461
var err error
451-
c.tableDesc, err = txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, c.tableID)
462+
c.tableDesc, err = byIDGetter.WithoutNonPublic().Get().Table(ctx, c.tableID)
452463
if err != nil {
453464
return err
454465
}

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
2222
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2323
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
24+
"github.com/cockroachdb/cockroach/pkg/testutils"
2425
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2526
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
2627
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
@@ -827,7 +828,7 @@ func TestInspectWithoutASOFSchemaChange(t *testing.T) {
827828
t.Logf("error polling job status: %v", err)
828829
return false
829830
}
830-
return status != "running" && status != "pending"
831+
return status == "failed"
831832
}, 60*time.Second, time.Second)
832833

833834
require.Equal(t, "failed", status, "detached job should fail after schema change")
@@ -838,3 +839,73 @@ func TestInspectWithoutASOFSchemaChange(t *testing.T) {
838839
defer rows.Close()
839840
require.False(t, rows.Next(), "no inspect errors are recorded when the job aborts before running checks")
840841
}
842+
843+
// TestInspectASOFAfterPrimaryKeySwapFails runs an INSPECT ASOF statement while
844+
// doing a schema change after the ASOF time.
845+
func TestInspectASOFAfterPrimaryKeySwap(t *testing.T) {
846+
defer leaktest.AfterTest(t)()
847+
defer log.Scope(t).Close(t)
848+
849+
ctx := context.Background()
850+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
851+
defer s.Stopper().Stop(ctx)
852+
r := sqlutils.MakeSQLRunner(db)
853+
854+
r.ExecMultiple(t,
855+
`SET enable_inspect_command = true`,
856+
`CREATE DATABASE t`,
857+
`CREATE TABLE t.pk_swap (
858+
old_pk INT PRIMARY KEY,
859+
new_pk INT UNIQUE NOT NULL,
860+
payload INT NOT NULL,
861+
INDEX payload_idx (payload)
862+
)`,
863+
`INSERT INTO t.pk_swap SELECT i, i+100, i+200 FROM generate_series(1, 5) AS g(i)`,
864+
)
865+
866+
var asOf string
867+
r.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&asOf)
868+
869+
// Rewrite the table descriptors by swapping the primary key.
870+
r.Exec(t, `ALTER TABLE t.pk_swap ALTER PRIMARY KEY USING COLUMNS (new_pk)`)
871+
872+
// Helper function to wait for the latest INSPECT job to complete successfully.
873+
waitForInspectJob := func() {
874+
var jobID int64
875+
var status string
876+
var fractionCompleted float64
877+
testutils.SucceedsSoon(t, func() error {
878+
row := db.QueryRow(`SELECT job_id, status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`)
879+
if err := row.Scan(&jobID, &status, &fractionCompleted); err != nil {
880+
return err
881+
}
882+
if status == "succeeded" || status == "failed" {
883+
return nil
884+
}
885+
return errors.Newf("job is not in the succeeded or failed state: %q", status)
886+
})
887+
require.Equal(t, "succeeded", status, "INSPECT job should succeed")
888+
require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion")
889+
requireCheckCountsMatch(t, r, jobID)
890+
}
891+
892+
t.Run("non-detached", func(t *testing.T) {
893+
r.Exec(t, fmt.Sprintf(`INSPECT TABLE t.pk_swap AS OF SYSTEM TIME '%s' WITH OPTIONS INDEX ALL`, asOf))
894+
waitForInspectJob()
895+
})
896+
897+
t.Run("detached with implicit transaction", func(t *testing.T) {
898+
r.Exec(t, fmt.Sprintf(`INSPECT TABLE t.pk_swap AS OF SYSTEM TIME '%s' WITH OPTIONS INDEX ALL, DETACHED`, asOf))
899+
waitForInspectJob()
900+
})
901+
902+
t.Run("detached with explicit transaction", func(t *testing.T) {
903+
_, err := db.Exec("BEGIN AS OF SYSTEM TIME '" + asOf + "'")
904+
require.NoError(t, err)
905+
_, err = db.Exec(fmt.Sprintf(`INSPECT TABLE t.pk_swap AS OF SYSTEM TIME '%s' WITH OPTIONS INDEX ALL, DETACHED`, asOf))
906+
require.NoError(t, err)
907+
_, err = db.Exec("COMMIT")
908+
// Expecting the commit to fail due to the schema change after ASOF time.
909+
require.Contains(t, err.Error(), "TransactionRetryError")
910+
})
911+
}

pkg/sql/inspect/inspect_job_entry.go

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/util/log"
2626
)
2727

28-
// TriggerJob starts an inspect job for the snapshot.
28+
// TriggerJob starts an inspect job for the snapshot. This always uses a fresh
29+
// transaction to create and start the job.
2930
func TriggerJob(
3031
ctx context.Context,
3132
jobRecordDescription string,
3233
execCfg *sql.ExecutorConfig,
33-
txn isql.Txn,
3434
checks []*jobspb.InspectDetails_Check,
3535
asOf hlc.Timestamp,
3636
) (*jobs.StartableJob, error) {
@@ -55,35 +55,15 @@ func TriggerJob(
5555
}
5656

5757
var job *jobs.StartableJob
58-
if txn == nil {
59-
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
60-
return execCfg.JobRegistry.CreateStartableJobWithTxn(ctx, &job, record.JobID, txn, record)
61-
}); err != nil {
62-
if job != nil {
63-
if cleanupErr := job.CleanupOnRollback(ctx); cleanupErr != nil {
64-
log.Dev.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
65-
}
58+
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) {
59+
return execCfg.JobRegistry.CreateStartableJobWithTxn(ctx, &job, record.JobID, txn, record)
60+
}); err != nil {
61+
if job != nil {
62+
if cleanupErr := job.CleanupOnRollback(ctx); cleanupErr != nil {
63+
log.Dev.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
6664
}
67-
return nil, err
68-
}
69-
} else {
70-
if err := func() (err error) {
71-
defer func() {
72-
if err == nil || job == nil {
73-
return
74-
}
75-
if cleanupErr := job.CleanupOnRollback(ctx); cleanupErr != nil {
76-
log.Dev.Errorf(ctx, "failed to cleanup job: %v", cleanupErr)
77-
}
78-
}()
79-
if err := execCfg.JobRegistry.CreateStartableJobWithTxn(ctx, &job, record.JobID, txn, record); err != nil {
80-
return err
81-
}
82-
83-
return txn.KV().Commit(ctx)
84-
}(); err != nil {
85-
return nil, err
8665
}
66+
return nil, err
8767
}
8868

8969
if err := job.Start(ctx); err != nil {

pkg/sql/inspect/inspect_plan.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func inspectPlanHook(
207207
return nil, nil, false, err
208208
}
209209

210-
if detached {
210+
if detached && !p.ExtendedEvalContext().TxnImplicit {
211211
fn := func(ctx context.Context, resultsCh chan<- tree.Datums) error {
212212
jobID := p.ExecCfg().JobRegistry.MakeJobID()
213213
jr := makeInspectJobRecord(tree.AsString(stmt), jobID, run.checks, run.asOfTimestamp)
@@ -216,12 +216,7 @@ func inspectPlanHook(
216216
); err != nil {
217217
return err
218218
}
219-
var notice pgnotice.Notice
220-
if p.ExtendedEvalContext().TxnImplicit {
221-
notice = pgnotice.Newf("INSPECT job %d running in the background", jobID)
222-
} else {
223-
notice = pgnotice.Newf("INSPECT job %d queued; will start after the current transaction commits", jobID)
224-
}
219+
notice := pgnotice.Newf("INSPECT job %d queued; will start after the current transaction commits", jobID)
225220
if err := p.SendClientNotice(ctx, notice, true /* immediateFlush */); err != nil {
226221
return err
227222
}
@@ -232,30 +227,37 @@ func inspectPlanHook(
232227
}
233228

234229
fn := func(ctx context.Context, resultsCh chan<- tree.Datums) error {
235-
// We create the job record in the planner's transaction to ensure that
236-
// the job record creation happens transactionally.
237-
plannerTxn := p.InternalSQLTxn()
238-
239-
sj, err := TriggerJob(ctx, tree.AsString(stmt), p.ExecCfg(), plannerTxn, run.checks, run.asOfTimestamp)
230+
sj, err := TriggerJob(ctx, tree.AsString(stmt), p.ExecCfg(), run.checks, run.asOfTimestamp)
240231
if err != nil {
241232
return err
242233
}
243234

244-
if err = p.SendClientNotice(
245-
ctx,
246-
pgnotice.Newf("waiting for INSPECT job to complete: %s\nIf the statement is canceled, the job will continue in the background.", sj.ID()),
247-
true, /* immediateFlush */
248-
); err != nil {
235+
var notice pgnotice.Notice
236+
if detached {
237+
notice = pgnotice.Newf("INSPECT job %d running in the background", sj.ID())
238+
} else {
239+
notice = pgnotice.Newf("waiting for INSPECT job to complete: %s\nIf the statement is canceled, the job will continue in the background.", sj.ID())
240+
}
241+
if err = p.SendClientNotice(ctx, notice, true /* immediateFlush */); err != nil {
249242
return err
250243
}
251244

252-
if err := sj.AwaitCompletion(ctx); err != nil {
245+
if detached {
246+
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID()))}
247+
return nil
248+
}
249+
250+
if err = sj.AwaitCompletion(ctx); err != nil {
253251
return err
254252
}
255253
return sj.ReportExecutionResults(ctx, resultsCh)
256254
}
257255

258-
return fn, jobs.InspectJobExecutionResultHeader, false, nil
256+
header := jobs.InspectJobExecutionResultHeader
257+
if detached {
258+
header = jobs.DetachedJobExecutionResultHeader
259+
}
260+
return fn, header, false, nil
259261
}
260262

261263
func makeInspectJobRecord(

0 commit comments

Comments
 (0)