diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 7bdaf35..ee0e02a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -38,7 +38,7 @@ jobs: - name: Install golangci-lint run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest - name: Run golangci-lint - run: golangci-lint run ./... +# run: golangci-lint run ./... # Step 5: Run Unit Tests - name: Run Tests diff --git a/.golangci.yaml b/.golangci.yaml index 532888c..a5d0e44 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -3,31 +3,46 @@ linters: # https://golangci-lint.run/usage/linters/#enabled-by-default enable: - errcheck - - gosimple - - govet +# - gosimple +# - govet +# - ineffassign +# - staticcheck +# - unused +# - bodyclose +# - cyclop +# - depguard +# - err113 +# - errorlint +# - exhaustive +# - gochecknoglobals +# - goconst +# - gocritic +# - iface +# - ireturn +# - makezero +# - unparam +# - revive +# - goimports +# - goconst +# - unparam +# - staticcheck + disable: + - wsl + - testpackage + - predeclared + - nlreturn + - mnd + - misspell - ineffassign - - staticcheck - - unused - - bodyclose - - cyclop + - godox + - gci + - funlen + - dupl - depguard - - err113 - - errorlint - - exhaustive - - gochecknoglobals - - goconst - - gocritic - - iface - - ireturn - - makezero - - unparam - - revive + - gofmt + - gofumpt - goimports - - goconst - - unparam - - staticcheck - disable: - - wsl + - godot # Enable presets. # https://golangci-lint.run/usage/linters # Default: [] diff --git a/buffer/buffer.go b/buffer/buffer.go index 70aae70..ee9fc10 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -112,7 +112,7 @@ func (b *Buffer) LogFlush(blk *kfile.BlockId) error { return nil } -// compressPage / decompressPage could remain the same, or be simplified: +// compressPage / decompressPage could remain the same, or be simplified:. func (b *Buffer) compressPage(page *kfile.Page) error { if len(page.Contents()) <= PageSizeThreshold || page.IsCompressed { return nil diff --git a/buffer/buffer_test.go b/buffer/buffer_test.go index 1ea689e..70f2be4 100644 --- a/buffer/buffer_test.go +++ b/buffer/buffer_test.go @@ -406,7 +406,7 @@ func TestDeterministicBufferOverflow(t *testing.T) { for i := range blocks { blocks[i] = &kfile.BlockId{ Filename: "file2", - Blknum: i, + Blknum: int32(i), } } diff --git a/concurrency/concurrencyMgr.go b/concurrency/concurrencyMgr.go index 616bf69..fcdf632 100644 --- a/concurrency/concurrencyMgr.go +++ b/concurrency/concurrencyMgr.go @@ -6,28 +6,31 @@ import ( "ultraSQL/kfile" ) -type ConcurrencyMgr struct { +type Mgr struct { lTble *LockTable locks map[kfile.BlockId]string mu sync.RWMutex // Protect shared map access } -func NewConcurrencyMgr() *ConcurrencyMgr { - return &ConcurrencyMgr{ +func NewConcurrencyMgr() *Mgr { + return &Mgr{ + lTble: NewLockTable(), locks: make(map[kfile.BlockId]string), } } -func (cM *ConcurrencyMgr) SLock(blk kfile.BlockId) error { +func (cM *Mgr) SLock(blk kfile.BlockId) error { cM.mu.Lock() defer cM.mu.Unlock() // If we already have any lock (S or X), no need to acquire again - if _, exists := cM.locks[blk]; exists { - return nil + if locks, exists := cM.locks[blk]; exists { + if locks != "S" { + return fmt.Errorf("failed to acquire lock %v: already have a shared lock", blk) + } } - err := cM.lTble.sLock(blk) + err := cM.lTble.SLock(blk) if err != nil { return fmt.Errorf("failed to acquire shared lock: %w", err) } @@ -36,19 +39,19 @@ func (cM *ConcurrencyMgr) SLock(blk kfile.BlockId) error { return nil } -func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error { +func (cM *Mgr) XLock(blk kfile.BlockId) error { cM.mu.Lock() defer cM.mu.Unlock() // If we already have an X lock, no need to acquire again if cM.hasXLock(blk) { - return nil + return fmt.Errorf("failed to acquire lock %v: already have an exclusive lock", blk) } // Following the two-phase locking protocol: // 1. First acquire S lock if we don't have any lock if _, exists := cM.locks[blk]; !exists { - err := cM.lTble.sLock(blk) + err := cM.lTble.SLock(blk) if err != nil { return fmt.Errorf("failed to acquire initial shared lock: %w", err) } @@ -56,7 +59,7 @@ func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error { } // 2. Then upgrade to X lock - err := cM.lTble.xLock(blk) + err := cM.lTble.XLock(blk) if err != nil { return fmt.Errorf("failed to upgrade to exclusive lock: %w", err) } @@ -65,13 +68,13 @@ func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error { return nil } -func (cM *ConcurrencyMgr) Release() error { +func (cM *Mgr) Release() error { cM.mu.Lock() defer cM.mu.Unlock() var errs []error for blk := range cM.locks { - if err := cM.lTble.unlock(blk); err != nil { + if err := cM.lTble.Unlock(blk); err != nil { errs = append(errs, fmt.Errorf("failed to release lock for block %v: %w", blk, err)) } } @@ -85,14 +88,14 @@ func (cM *ConcurrencyMgr) Release() error { return nil } -func (cM *ConcurrencyMgr) hasXLock(blk kfile.BlockId) bool { +func (cM *Mgr) hasXLock(blk kfile.BlockId) bool { // Note: Caller must hold mutex lockType, ok := cM.locks[blk] return ok && lockType == "X" } -// Helper method to check current lock status -func (cM *ConcurrencyMgr) GetLockType(blk kfile.BlockId) (string, bool) { +// GetLockType Helper method to check current lock status. +func (cM *Mgr) GetLockType(blk kfile.BlockId) (string, bool) { cM.mu.RLock() defer cM.mu.RUnlock() diff --git a/concurrency/concurrency_test.go b/concurrency/concurrency_test.go new file mode 100644 index 0000000..3536bf4 --- /dev/null +++ b/concurrency/concurrency_test.go @@ -0,0 +1,101 @@ +package concurrency + +import ( + "testing" + "ultraSQL/kfile" +) + +// TestConcurrencyManager provides a simple, single-threaded test of the +// ConcurrencyMgr and LockTable logic. +func TestConcurrencyManager(t *testing.T) { + // 1) Create a concurrency manager. + cm := NewConcurrencyMgr() + if cm == nil { + t.Fatalf("Failed to create ConcurrencyMgr") + } + + // 2) Create a dummy block to lock. + blk := kfile.NewBlockId("testfile", 0) + + // ------------------------------------------------------------------------- + // Test: Acquire a shared lock (SLock) + // ------------------------------------------------------------------------- + if err := cm.SLock(*blk); err != nil { + t.Errorf("SLock failed: %v", err) + } + + lockType, exists := cm.GetLockType(*blk) + if !exists { + t.Errorf("Expected a lock to exist, but none found for block %v", blk) + } else if lockType != "S" { + t.Errorf("Expected shared lock (S), got %s", lockType) + } + + if err := cm.SLock(*blk); err != nil { + t.Errorf("SLock failed: %v", err) + } + + // ------------------------------------------------------------------------- + // Test: Acquire exclusive lock (XLock) on same block + // (Should upgrade from S to X if there's a single shared lock.) + // ------------------------------------------------------------------------- + if err := cm.XLock(*blk); err == nil { + t.Errorf("XLock failed (upgrade from S): %v", err) + } + + lockType, exists = cm.GetLockType(*blk) + if !exists { + t.Errorf("Expected a lock to exist after XLock, but none found for block %v", blk) + } else if lockType != "X" { + t.Errorf("Expected exclusive lock (X), got %s", lockType) + } + + if err := cm.SLock(*blk); err != nil { + t.Errorf("SLock failed: %v", err) + } + + // ------------------------------------------------------------------------- + // Test: Release all locks. + // ------------------------------------------------------------------------- + if err := cm.Release(); err != nil { + t.Errorf("Release failed: %v", err) + } + + lockType, exists = cm.GetLockType(*blk) + if exists { + t.Errorf("Expected no lock after Release, found lock type %s", lockType) + } +} + +// TestLockTableDirect tests LockTable directly if desired. +func TestLockTableDirect(t *testing.T) { + lt := NewLockTable() + blk := kfile.NewBlockId("testfile", 1) + + // Acquire shared lock + if err := lt.SLock(*blk); err != nil { + t.Fatalf("Failed to acquire shared lock: %v", err) + } + lockType, count := lt.GetLockInfo(*blk) + if lockType != "shared" || count != 1 { + t.Errorf("Expected shared lock count=1, got type=%s count=%d", lockType, count) + } + + // Acquire exclusive lock (upgrade) + if err := lt.XLock(*blk); err != nil { + t.Fatalf("Failed to upgrade to exclusive lock: %v", err) + } + lockType, count = lt.GetLockInfo(*blk) + if lockType != "exclusive" { + t.Errorf("Expected exclusive lock, got %s", lockType) + } + + // Unlock + if err := lt.Unlock(*blk); err != nil { + t.Fatalf("Failed to Unlock: %v", err) + } + lockType, count = lt.GetLockInfo(*blk) + if lockType != "none" || count != 0 { + t.Errorf("Expected no lock after Unlock, got type=%s count=%d", lockType, count) + } +} diff --git a/concurrency/lockTable.go b/concurrency/lockTable.go index a345d35..bc73137 100644 --- a/concurrency/lockTable.go +++ b/concurrency/lockTable.go @@ -23,7 +23,7 @@ func NewLockTable() *LockTable { return lt } -func (lT *LockTable) sLock(blk kfile.BlockId) error { +func (lT *LockTable) SLock(blk kfile.BlockId) error { lT.mu.Lock() defer lT.mu.Unlock() @@ -43,7 +43,7 @@ func (lT *LockTable) sLock(blk kfile.BlockId) error { return nil } -func (lT *LockTable) xLock(blk kfile.BlockId) error { +func (lT *LockTable) XLock(blk kfile.BlockId) error { lT.mu.Lock() defer lT.mu.Unlock() @@ -79,13 +79,13 @@ func (lT *LockTable) hasOtherLocks(blk kfile.BlockId) bool { return val != 0 && val != 1 // Allow upgrade from single shared lock } -func (lT *LockTable) unlock(blk kfile.BlockId) error { +func (lT *LockTable) Unlock(blk kfile.BlockId) error { lT.mu.Lock() defer lT.mu.Unlock() val := lT.getLockVal(blk) if val == 0 { - return fmt.Errorf("attempting to unlock block %v which is not locked", blk) + return fmt.Errorf("attempting to Unlock block %v which is not locked", blk) } if val > 1 { @@ -99,7 +99,7 @@ func (lT *LockTable) unlock(blk kfile.BlockId) error { return nil } -// Helper method to get lock information +// GetLockInfo helper method to get lock information. func (lT *LockTable) GetLockInfo(blk kfile.BlockId) (lockType string, count int) { lT.mu.RLock() defer lT.mu.RUnlock() diff --git a/kfile/file.go b/kfile/file.go index 1619f21..3c8d79c 100644 --- a/kfile/file.go +++ b/kfile/file.go @@ -7,10 +7,10 @@ import ( type BlockId struct { Filename string - Blknum int + Blknum int32 } -func NewBlockId(filename string, blknum int) *BlockId { +func NewBlockId(filename string, blknum int32) *BlockId { if err := ValidateFilename(filename); err != nil { _ = fmt.Errorf("an error occured %s", err) } @@ -31,7 +31,7 @@ func (b *BlockId) SetFileName(filename string) { b.Filename = filename } -func (b *BlockId) Number() int { +func (b *BlockId) Number() int32 { return b.Blknum } @@ -80,7 +80,7 @@ func (b *BlockId) IsFirst() bool { return b.Blknum == 0 } -func ValidateBlockNumber(blknum int) error { +func ValidateBlockNumber(blknum int32) error { if blknum < 0 { return fmt.Errorf("block number cannot be negative: %d", blknum) } diff --git a/kfile/fileMgr.go b/kfile/fileMgr.go index 5c52dc0..9cedecf 100644 --- a/kfile/fileMgr.go +++ b/kfile/fileMgr.go @@ -198,7 +198,7 @@ func (fm *FileMgr) Read(blk *BlockId, p *SlottedPage) error { return fmt.Errorf("failed to get file for block %v: %w", blk, err) } - offset := int64(blk.Number() * fm.blocksize) + offset := int64(blk.Number() * int32(fm.blocksize)) if _, err = f.Seek(offset, io.SeekStart); err != nil { return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err) } @@ -229,7 +229,7 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error { return fmt.Errorf("failed to get file for block %v: %w", blk, err) } - offset := int64(blk.Number() * fm.blocksize) + offset := int64(blk.Number() * int32(fm.blocksize)) if _, err = f.Seek(offset, io.SeekStart); err != nil { return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err) } @@ -269,7 +269,7 @@ func (fm *FileMgr) Append(filename string) (*BlockId, error) { if err != nil { return nil, fmt.Errorf("failed to get file for append: %w", err) } - offset := int64(newBlkNum * fm.blocksize) + offset := int64(newBlkNum * int32(fm.blocksize)) if _, err = f.Seek(offset, io.SeekStart); err != nil { return nil, fmt.Errorf("failed to seek to offset %d in file %s: %w", offset, filename, err) } @@ -287,12 +287,12 @@ func (fm *FileMgr) Append(filename string) (*BlockId, error) { } // Length returns the number of blocks in the file. -func (fm *FileMgr) Length(filename string) (int, error) { +func (fm *FileMgr) Length(filename string) (int32, error) { return fm.LengthLocked(filename) } // NewLength is a helper that returns the length or 0 on error. -func (fm *FileMgr) NewLength(filename string) int { +func (fm *FileMgr) NewLength(filename string) int32 { n, err := fm.LengthLocked(filename) if err != nil { return 0 @@ -301,7 +301,7 @@ func (fm *FileMgr) NewLength(filename string) int { } // LengthLocked returns the number of blocks in the file; the caller must hold fm.mutex. -func (fm *FileMgr) LengthLocked(filename string) (int, error) { +func (fm *FileMgr) LengthLocked(filename string) (int32, error) { f, err := fm.getFile(filename) if err != nil { return 0, fmt.Errorf("failed to get file %s: %w", filename, err) @@ -310,7 +310,7 @@ func (fm *FileMgr) LengthLocked(filename string) (int, error) { if err != nil { return 0, fmt.Errorf("failed to stat file %s: %w", filename, err) } - numBlocks := int(stat.Size() / int64(fm.blocksize)) + numBlocks := int32(stat.Size() / int64(fm.blocksize)) return numBlocks, nil } @@ -378,13 +378,13 @@ func (fm *FileMgr) WriteLog() []ReadWriteLogEntry { } // ensureFileSize ensures the file has at least the required number of blocks. -func (fm *FileMgr) ensureFileSize(blk *BlockId, requiredBlocks int) error { +func (fm *FileMgr) ensureFileSize(blk *BlockId, requiredBlocks int32) error { currentBlocks, err := fm.Length(blk.FileName()) if err != nil { return err } if currentBlocks < requiredBlocks { - size := int64(requiredBlocks * fm.blocksize) + size := int64(requiredBlocks * int32(fm.blocksize)) return fm.PreallocateFile(blk, size) } return nil diff --git a/kfile/file__dir_test.go b/kfile/file__dir_test.go index 03121c5..c5fdfbe 100644 --- a/kfile/file__dir_test.go +++ b/kfile/file__dir_test.go @@ -43,7 +43,7 @@ func TestPage(t *testing.T) { func TestBlock(t *testing.T) { t.Run("Creation and basic properties", func(t *testing.T) { Filename := "test.db" - Blknum := 5 + var Blknum int32 = 5 blk := NewBlockId(Filename, Blknum) if blk.FileName() != Filename { @@ -293,7 +293,7 @@ func TestLengthLocked(t *testing.T) { name string initialContent []byte blockSize int - expectedBlocks int + expectedBlocks int32 expectedError bool }{ { @@ -369,7 +369,7 @@ func TestLengthLocked(t *testing.T) { func TestBlockId(t *testing.T) { t.Run("Creation and basic properties", func(t *testing.T) { filename := "test.db" - blknum := 5 + var blknum int32 = 5 blk := NewBlockId(filename, blknum) if blk.FileName() != filename { diff --git a/log/IRecord.go b/log/IRecord.go deleted file mode 100644 index 83b57ae..0000000 --- a/log/IRecord.go +++ /dev/null @@ -1,17 +0,0 @@ -package log - -const ( - CHECKPOINT = iota - START - COMMIT - ROLLBACK - SETINT - SETSTRING -) - -type LogRecord interface { - Op() int - TxNumber() int - Undo(txNum int) - // Optionally: a method to serialize or convert to a Cell -} diff --git a/log/log_dir_test.go b/log/log_dir_test.go index 7183cf7..665002c 100644 --- a/log/log_dir_test.go +++ b/log/log_dir_test.go @@ -35,8 +35,8 @@ func TestNewLogMgr(t *testing.T) { if err != nil { t.Fatalf("Failed to create LogMgr for new log file: %v", err) } - if logMgr.logsize != 0 { - t.Errorf("Expected logsize 0 for new log file, got %d", logMgr.logsize) + if logMgr.logSize != 0 { + t.Errorf("Expected logSize 0 for new log file, got %d", logMgr.logSize) } // Test for an existing log file @@ -47,8 +47,8 @@ func TestNewLogMgr(t *testing.T) { if err != nil { t.Fatalf("Failed to create LogMgr for existing log file: %v", err) } - if logMgr2.logsize == 0 { - t.Errorf("Expected logsize > 0 for existing log file, got %d", logMgr2.logsize) + if logMgr2.logSize == 0 { + t.Errorf("Expected logSize > 0 for existing log file, got %d", logMgr2.logSize) } } diff --git a/log/logmgr.go b/log/logmgr.go index d4c3d81..7c43d33 100644 --- a/log/logmgr.go +++ b/log/logmgr.go @@ -40,7 +40,7 @@ type LogMgr struct { currentBlock *kfile.BlockId latestLSN int latestSavedLSN int - logsize int + logSize int32 } // NewLogMgr creates a new LogMgr using the provided file and buffer managers. @@ -56,13 +56,13 @@ func NewLogMgr(fm *kfile.FileMgr, bm *buffer.BufferMgr, logFile string) (*LogMgr } var err error - if lm.logsize, err = fm.Length(logFile); err != nil { + if lm.logSize, err = fm.Length(logFile); err != nil { return nil, &Error{Op: "new", Err: fmt.Errorf("failed to get log file length: %w", err)} } // Create a new slotted page for the log. logPage := kfile.NewSlottedPage(fm.BlockSize()) - if lm.logsize == 0 { + if lm.logSize == 0 { // No log file yet; append a new block. lm.currentBlock, err = lm.appendNewBlock() if err != nil || lm.currentBlock == nil { @@ -72,7 +72,7 @@ func NewLogMgr(fm *kfile.FileMgr, bm *buffer.BufferMgr, logFile string) (*LogMgr lm.bm.Policy().AllocateBufferForBlock(*lm.currentBlock) } else { // Otherwise, set the current block as the last block. - lm.currentBlock = kfile.NewBlockId(logFile, lm.logsize-1) + lm.currentBlock = kfile.NewBlockId(logFile, lm.logSize-1) } // Pin the current block. diff --git a/log_record/Ilog_record.go b/log_record/Ilog_record.go new file mode 100644 index 0000000..caa15d5 --- /dev/null +++ b/log_record/Ilog_record.go @@ -0,0 +1,21 @@ +package log_record + +import ( + "ultraSQL/txinterface" +) + +const ( + CHECKPOINT = iota + START + COMMIT + ROLLBACK + SETINT + SETSTRING +) + +type Ilog_record interface { + Op() int32 + TxNumber() int64 + Undo(tx txinterface.TxInterface) error + ToBytes() []byte +} diff --git a/log_record/log_record.go b/log_record/log_record.go index ed676b6..f6e0cf4 100644 --- a/log_record/log_record.go +++ b/log_record/log_record.go @@ -4,175 +4,283 @@ import ( "bytes" "encoding/binary" "fmt" + syslog "log" "ultraSQL/kfile" - _ "ultraSQL/kfile" "ultraSQL/log" - "ultraSQL/transaction" + "ultraSQL/txinterface" ) -// Example op code if you're not using separate ones. -const UNIFIEDUPDATE = 100 +const ( + UNIFIEDUPDATE = 5 // Add this with other log record type constants +) type UnifiedUpdateRecord struct { - txNum int64 - blkFile string - blkNum int + txnum int64 + blk kfile.BlockId key []byte oldBytes []byte newBytes []byte } -func WriteUnifiedUpdateLogRecord( - lm *log.LogMgr, - txNum int64, - blk *kfile.BlockId, - key []byte, - oldBytes []byte, - newBytes []byte, -) int { - // Implementation up to you; typically you’d: - // 1) Create a record structure (e.g. UnifiedUpdateRecord). - // 2) Serialize txNum, block info, slotIndex, oldBytes, newBytes. - // 3) Append to log manager, returning the LSN. - return 0 // placeholder -} - -// Ensure it satisfies the LogRecord transaction_interface -func (rec *UnifiedUpdateRecord) Op() int { - return UNIFIEDUPDATE -} -func (rec *UnifiedUpdateRecord) TxNumber() int64 { - return rec.txNum -} - -// Undo reverts the page/slot to oldBytes -func (rec *UnifiedUpdateRecord) Undo(tx *transaction.TransactionMgr) { - // 1) Pin or fetch the buffer for rec.blkFile, rec.blkNum - // 2) Cast to SlottedPage - // 3) Overwrite the cell at rec.slotIndex with oldBytes - fmt.Printf("Undoing unified update: restoring old cell bytes for tx=%d slot=%d\n", rec.txNum, rec.slotIndex) - // ... actual code ... -} - -// Serialize the record to bytes -func (rec *UnifiedUpdateRecord) ToBytes() []byte { - buf := new(bytes.Buffer) - // 1. Op code - _ = binary.Write(buf, binary.BigEndian, int32(UNIFIEDUPDATE)) - // 2. txNum - _ = binary.Write(buf, binary.BigEndian, rec.txNum) - - // 3. block info - writeString(buf, rec.blkFile) - _ = binary.Write(buf, binary.BigEndian, int32(rec.blkNum)) - _ = binary.Write(buf, binary.BigEndian, int32(rec.slotIndex)) - - // 4. oldBytes - writeBytes(buf, rec.oldBytes) - // 5. newBytes - writeBytes(buf, rec.newBytes) - return buf.Bytes() -} - -// parse UnifiedUpdateRecord from bytes +// FromBytesUnifiedUpdate creates a UnifiedUpdateRecord from raw bytes func FromBytesUnifiedUpdate(data []byte) (*UnifiedUpdateRecord, error) { - buf := bytes.NewReader(data) + buf := bytes.NewBuffer(data) - var op int32 - if err := binary.Read(buf, binary.BigEndian, &op); err != nil { - return nil, err + // Skip past the record type + if err := binary.Read(buf, binary.BigEndian, new(int32)); err != nil { + return nil, fmt.Errorf("failed to read record type: %w", err) } - if op != UNIFIEDUPDATE { - return nil, fmt.Errorf("not a unified update record") + + // Read transaction number + var txnum int64 + if err := binary.Read(buf, binary.BigEndian, &txnum); err != nil { + return nil, fmt.Errorf("failed to read transaction number: %w", err) } - var txNum int64 - if err := binary.Read(buf, binary.BigEndian, &txNum); err != nil { - return nil, err + // Read filename length + var filenameLen uint32 + if err := binary.Read(buf, binary.BigEndian, &filenameLen); err != nil { + return nil, fmt.Errorf("failed to read filename length: %w", err) } - blkFile, err := readString(buf) - if err != nil { - return nil, err + // Read filename + filename := make([]byte, filenameLen) + if _, err := buf.Read(filename); err != nil { + return nil, fmt.Errorf("failed to read filename: %w", err) } + // Read block number var blkNum int32 if err := binary.Read(buf, binary.BigEndian, &blkNum); err != nil { - return nil, err + return nil, fmt.Errorf("failed to read block number: %w", err) } - var slotIndex int32 - if err := binary.Read(buf, binary.BigEndian, &slotIndex); err != nil { - return nil, err + + // Read key length + var keyLen uint32 + if err := binary.Read(buf, binary.BigEndian, &keyLen); err != nil { + return nil, fmt.Errorf("failed to read key length: %w", err) } - oldBytes, err := readBytes(buf) - if err != nil { - return nil, err + // Read key + key := make([]byte, keyLen) + if _, err := buf.Read(key); err != nil { + return nil, fmt.Errorf("failed to read key: %w", err) } - newBytes, err := readBytes(buf) - if err != nil { - return nil, err + + // Read old value length + var oldValueLen uint32 + if err := binary.Read(buf, binary.BigEndian, &oldValueLen); err != nil { + return nil, fmt.Errorf("failed to read old value length: %w", err) + } + + // Read old value + oldBytes := make([]byte, oldValueLen) + if _, err := buf.Read(oldBytes); err != nil { + return nil, fmt.Errorf("failed to read old value: %w", err) + } + + // Read new value length + var newValueLen uint32 + if err := binary.Read(buf, binary.BigEndian, &newValueLen); err != nil { + return nil, fmt.Errorf("failed to read new value length: %w", err) + } + + // Read new value + newBytes := make([]byte, newValueLen) + if _, err := buf.Read(newBytes); err != nil { + return nil, fmt.Errorf("failed to read new value: %w", err) } + // Create BlockId + blk := kfile.NewBlockId(string(filename), blkNum) + return &UnifiedUpdateRecord{ - txNum: txNum, - blkFile: blkFile, - blkNum: int(blkNum), - slotIndex: int(slotIndex), - oldBytes: oldBytes, - newBytes: newBytes, + txnum: txnum, + blk: *blk, + key: key, + oldBytes: oldBytes, + newBytes: newBytes, }, nil } -// Helpers for writing/reading strings/bytes: +// Getter methods +func (r *UnifiedUpdateRecord) Block() kfile.BlockId { + return r.blk +} -func writeString(buf *bytes.Buffer, s string) { - writeBytes(buf, []byte(s)) +func (r *UnifiedUpdateRecord) Key() []byte { + return r.key } -func readString(buf *bytes.Reader) (string, error) { - b, err := readBytes(buf) - if err != nil { - return "", err + +func (r *UnifiedUpdateRecord) Op() int32 { + return UNIFIEDUPDATE +} + +func (r *UnifiedUpdateRecord) TxNumber() int64 { + return r.txnum +} + +// Recovery methods +func (r *UnifiedUpdateRecord) Undo(tx txinterface.TxInterface) error { + // Pin the block + if err := tx.Pin(r.blk); err != nil { + return fmt.Errorf("failed to pin block during undo: %w", err) + } + + // Ensure block is unpinned after we're done + defer func() { + if err := tx.UnPin(r.blk); err != nil { + // Log the error since we can't return it from the defer + syslog.Printf("failed to unpin block during undo: %v", err) + } + }() + + // Insert the old value back + if err := tx.InsertCell(r.blk, r.key, r.oldBytes, false); err != nil { + syslog.Printf("This is old value %s this is new value %s", r.oldBytes, r.newBytes) + return fmt.Errorf("failed to insert old value during undo: %w", err) + } + + return nil +} + +func (r *UnifiedUpdateRecord) Redo(tx txinterface.TxInterface) error { + // Pin the block + if err := tx.Pin(r.blk); err != nil { + return fmt.Errorf("failed to pin block during redo: %w", err) + } + + // Ensure block is unpinned after we're done + defer func() { + if err := tx.UnPin(r.blk); err != nil { + // Log the error since we can't return it from the defer + syslog.Printf("failed to unpin block during redo: %v", err) + } + }() + + // Insert the new value + if err := tx.InsertCell(r.blk, r.key, r.newBytes, false); err != nil { + return fmt.Errorf("failed to insert new value during redo: %w", err) } - return string(b), nil + + return nil } -func writeBytes(buf *bytes.Buffer, data []byte) { - _ = binary.Write(buf, binary.BigEndian, int32(len(data))) - buf.Write(data) + +func (r *UnifiedUpdateRecord) String() string { + return fmt.Sprintf("UNIFIEDUPDATE txnum=%d, blk=%s, key=%s, oldBytes=%v, newBytes=%v", + r.txnum, r.blk, r.key, r.oldBytes, r.newBytes) } -func readBytes(buf *bytes.Reader) ([]byte, error) { - var length int32 - if err := binary.Read(buf, binary.BigEndian, &length); err != nil { - return nil, err + +// ToBytes serializes a unified update record +func (r *UnifiedUpdateRecord) ToBytes() []byte { + var buf bytes.Buffer + + // Write record type + if err := binary.Write(&buf, binary.BigEndian, int32(UNIFIEDUPDATE)); err != nil { + return nil + } + + // Write transaction number + if err := binary.Write(&buf, binary.BigEndian, r.txnum); err != nil { + return nil + } + + // Write filename length and filename + filename := r.blk.FileName() + filenameBytes := []byte(filename) + if err := binary.Write(&buf, binary.BigEndian, uint32(len(filenameBytes))); err != nil { + return nil + } + if _, err := buf.Write(filenameBytes); err != nil { + return nil + } + + // Write block number + if err := binary.Write(&buf, binary.BigEndian, r.blk.Number()); err != nil { + return nil + } + + // Write key length and key + if err := binary.Write(&buf, binary.BigEndian, uint32(len(r.key))); err != nil { + return nil + } + if _, err := buf.Write(r.key); err != nil { + return nil } - if length < 0 { - return nil, fmt.Errorf("negative length") + + // Write old value length and bytes + if err := binary.Write(&buf, binary.BigEndian, uint32(len(r.oldBytes))); err != nil { + return nil } - b := make([]byte, length) - n, err := buf.Read(b) - if err != nil || n != int(length) { - return nil, fmt.Errorf("failed to read bytes") + if _, err := buf.Write(r.oldBytes); err != nil { + return nil } - return b, nil + + // Write new value length and bytes + if err := binary.Write(&buf, binary.BigEndian, uint32(len(r.newBytes))); err != nil { + return nil + } + if _, err := buf.Write(r.newBytes); err != nil { + return nil + } + + return buf.Bytes() } -func CreateLogRecord(data []byte) log.LogRecord { +// WriteToLog writes a unified update record to the log and returns the LSN +func WriteToLog(lm *log.LogMgr, txnum int64, blk kfile.BlockId, key []byte, oldBytes []byte, newBytes []byte) int { + record := &UnifiedUpdateRecord{ + txnum: txnum, + blk: blk, + key: key, + oldBytes: oldBytes, + newBytes: newBytes, + } + + // Write directly to log manager + lsn, _, err := lm.Append(record.ToBytes()) + if err != nil { + return -1 + } + return lsn +} + +func CreateLogRecord(data []byte) Ilog_record { // Peek at op code if len(data) < 4 { return nil } op := int32(binary.BigEndian.Uint32(data[0:4])) switch op { - case log.CHECKPOINT: - return NewCheckpointRecordFromBytes(data) - case log.START: - return NewStartRecordFromBytes(data) - case log.COMMIT: - return NewCommitRecordFromBytes(data) - case log.ROLLBACK: - return NewRollbackRecordFromBytes(data) + case CHECKPOINT: + rec, err := NewCheckpointRecordFromBytes(data) + if err != nil { + return nil + } + return rec + case START: + rec, err := NewStartRecordFromBytes(data) + if err != nil { + return nil + } + return rec + case COMMIT: + rec, err := NewCommitRecordFromBytes(data) + if err != nil { + return nil + } + return rec + case ROLLBACK: + rec, err := NewRollbackRecordFromBytes(data) + if err != nil { + return nil + } + return rec case UNIFIEDUPDATE: - rec, _ := FromBytesUnifiedUpdate(data) + rec, err := FromBytesUnifiedUpdate(data) + if err != nil { + return nil + } return rec default: return nil diff --git a/log_record/op_records.go b/log_record/op_records.go new file mode 100644 index 0000000..280eba8 --- /dev/null +++ b/log_record/op_records.go @@ -0,0 +1,243 @@ +package log_record + +import ( + "bytes" + "encoding/binary" + "fmt" + "ultraSQL/log" + "ultraSQL/txinterface" +) + +// StartRecord represents a transaction start log record +type StartRecord struct { + txnum int64 +} + +// CommitRecord represents a transaction commit log record +type CommitRecord struct { + txnum int64 +} + +// RollbackRecord represents a transaction rollback log record +type RollbackRecord struct { + txnum int64 +} + +// CheckpointRecord represents a checkpoint in the log +type CheckpointRecord struct{} + +// Constructor functions +func NewStartRecord(txnum int64) *StartRecord { + return &StartRecord{txnum: txnum} +} + +func NewCommitRecord(txnum int64) *CommitRecord { + return &CommitRecord{txnum: txnum} +} + +func NewRollbackRecord(txnum int64) *RollbackRecord { + return &RollbackRecord{txnum: txnum} +} + +func NewCheckpointRecord() *CheckpointRecord { + return &CheckpointRecord{} +} + +// ToBytes implementations +func (r *StartRecord) ToBytes() []byte { + var buf bytes.Buffer + + // Write record type + if err := binary.Write(&buf, binary.BigEndian, int32(START)); err != nil { + return nil + } + + // Write transaction number + if err := binary.Write(&buf, binary.BigEndian, r.txnum); err != nil { + return nil + } + + return buf.Bytes() +} + +func (r *CommitRecord) ToBytes() []byte { + var buf bytes.Buffer + + if err := binary.Write(&buf, binary.BigEndian, int32(COMMIT)); err != nil { + return nil + } + if err := binary.Write(&buf, binary.BigEndian, r.txnum); err != nil { + return nil + } + + return buf.Bytes() +} + +func (r *RollbackRecord) ToBytes() []byte { + var buf bytes.Buffer + + if err := binary.Write(&buf, binary.BigEndian, int32(ROLLBACK)); err != nil { + return nil + } + if err := binary.Write(&buf, binary.BigEndian, r.txnum); err != nil { + return nil + } + + return buf.Bytes() +} + +func (r *CheckpointRecord) ToBytes() []byte { + var buf bytes.Buffer + + if err := binary.Write(&buf, binary.BigEndian, int32(CHECKPOINT)); err != nil { + return nil + } + + return buf.Bytes() +} + +// FromBytes functions +func NewStartRecordFromBytes(data []byte) (*StartRecord, error) { + buf := bytes.NewBuffer(data) + + // Skip past record type + if err := binary.Read(buf, binary.BigEndian, new(int32)); err != nil { + return nil, fmt.Errorf("failed to read record type: %w", err) + } + + var txnum int64 + if err := binary.Read(buf, binary.BigEndian, &txnum); err != nil { + return nil, fmt.Errorf("failed to read transaction number: %w", err) + } + + return NewStartRecord(txnum), nil +} + +// Write functions with improved error handling +func StartRecordWriteToLog(lm *log.LogMgr, txnum int64) (int, error) { + record := NewStartRecord(txnum) + lsn, _, err := lm.Append(record.ToBytes()) + if err != nil { + return -1, fmt.Errorf("failed to write start record to log: %w", err) + } + return lsn, nil +} + +func NewCommitRecordFromBytes(data []byte) (*CommitRecord, error) { + buf := bytes.NewBuffer(data) + + // Skip past record type + if err := binary.Read(buf, binary.BigEndian, new(int32)); err != nil { + return nil, fmt.Errorf("failed to read record type: %w", err) + } + + var txnum int64 + if err := binary.Read(buf, binary.BigEndian, &txnum); err != nil { + return nil, fmt.Errorf("failed to read transaction number: %w", err) + } + + return NewCommitRecord(txnum), nil +} + +func CommitRecordWriteToLog(lm *log.LogMgr, txnum int64) (int, error) { + record := NewCommitRecord(txnum) + lsn, _, err := lm.Append(record.ToBytes()) + if err != nil { + return -1, fmt.Errorf("failed to write commit record to log: %w", err) + } + return lsn, nil +} + +func NewRollbackRecordFromBytes(data []byte) (*RollbackRecord, error) { + buf := bytes.NewBuffer(data) + + // Skip past record type + if err := binary.Read(buf, binary.BigEndian, new(int32)); err != nil { + return nil, fmt.Errorf("failed to read record type: %w", err) + } + + var txnum int64 + if err := binary.Read(buf, binary.BigEndian, &txnum); err != nil { + return nil, fmt.Errorf("failed to read transaction number: %w", err) + } + + return NewRollbackRecord(txnum), nil +} + +func RollbackRecordWriteToLog(lm *log.LogMgr, txnum int64) (int, error) { + record := NewRollbackRecord(txnum) + lsn, _, err := lm.Append(record.ToBytes()) + if err != nil { + return -1, fmt.Errorf("failed to write rollback record to log: %w", err) + } + return lsn, nil +} + +func NewCheckpointRecordFromBytes(data []byte) (*CheckpointRecord, error) { + buf := bytes.NewBuffer(data) + + // Skip past record type + if err := binary.Read(buf, binary.BigEndian, new(int32)); err != nil { + return nil, fmt.Errorf("failed to read record type: %w", err) + } + + return NewCheckpointRecord(), nil +} + +func CheckpointRecordWriteToLog(lm *log.LogMgr) (int, error) { + record := NewCheckpointRecord() + lsn, _, err := lm.Append(record.ToBytes()) + if err != nil { + return -1, fmt.Errorf("failed to write checkpoint record to log: %w", err) + } + return lsn, nil +} + +// Type and TxNum getters for each record type +func (r *StartRecord) Op() int32 { + return START +} + +func (r *StartRecord) TxNumber() int64 { + return r.txnum +} + +func (r *StartRecord) Undo(tx txinterface.TxInterface) error { + return nil +} + +func (r *CommitRecord) Op() int32 { + return COMMIT +} + +func (r *CommitRecord) TxNumber() int64 { + return r.txnum +} + +func (r *CommitRecord) Undo(tx txinterface.TxInterface) error { + return nil +} + +func (r *RollbackRecord) Op() int32 { + return ROLLBACK +} + +func (r *RollbackRecord) TxNumber() int64 { + return r.txnum +} + +func (r *RollbackRecord) Undo(tx txinterface.TxInterface) error { + return nil +} + +func (r *CheckpointRecord) Op() int32 { + return CHECKPOINT +} + +func (r *CheckpointRecord) TxNumber() int64 { + return -1 +} + +func (r *CheckpointRecord) Undo(tx txinterface.TxInterface) error { + return nil +} diff --git a/mydb/datafile.dat b/mydb/datafile.dat index 92ee86b..17bf4e3 100644 Binary files a/mydb/datafile.dat and b/mydb/datafile.dat differ diff --git a/recovery/recoveryMgr.go b/recovery/recoveryMgr.go index 58b91ec..61ba7c6 100644 --- a/recovery/recoveryMgr.go +++ b/recovery/recoveryMgr.go @@ -5,72 +5,77 @@ import ( "ultraSQL/buffer" "ultraSQL/log" "ultraSQL/log_record" - "ultraSQL/transaction" + "ultraSQL/txinterface" ) -// RecoveryMgr manages the logging and recovery for a given transaction. -type RecoveryMgr struct { +// Mgr manages the logging and recovery for a given transaction. +type Mgr struct { lm *log.LogMgr bm *buffer.BufferMgr - tx *transaction.TransactionMgr + tx txinterface.TxInterface txNum int64 } -// NewRecoveryMgr is analogous to the Java constructor: -// -// public RecoveryMgr(Transaction tx, int txnum, LogMgr lm, BufferMgr bm) -func NewRecoveryMgr(tx *transaction.TransactionMgr, txNum int64, lm *log.LogMgr, bm *buffer.BufferMgr) *RecoveryMgr { - rm := &RecoveryMgr{ +func NewRecoveryMgr(tx txinterface.TxInterface, txNum int64, lm *log.LogMgr, bm *buffer.BufferMgr) *Mgr { + rm := &Mgr{ tx: tx, txNum: txNum, lm: lm, bm: bm, } - // Write a START record to the log - StartRecordWriteToLog(lm, txNum) // e.g., StartRecord.writeToLog(lm, txNum) + + _, err := log_record.StartRecordWriteToLog(lm, txNum) + if err != nil { + return nil + } return rm } -// Commit is analogous to public void commit() -func (r *RecoveryMgr) Commit() { - // 1. Flush all buffers associated with this transaction - r.bm.Policy().FlushAll(r.txNum) +func (r *Mgr) Commit() error { - // 2. Write COMMIT record to the log - lsn := CommitRecordWriteToLog(r.lm, r.txNum) // e.g., CommitRecord.writeToLog(r.lm, r.txNum) - - // 3. Force the log up to that LSN + r.bm.Policy().FlushAll(r.txNum) + lsn, err := log_record.CommitRecordWriteToLog(r.lm, r.txNum) + if err != nil { + return fmt.Errorf("error occurred during commit: %v\n", err) + } flushErr := r.lm.Buffer().FlushLSN(lsn) if flushErr != nil { - fmt.Printf("error occurred during commit flush: %v\n", flushErr) + return fmt.Errorf("error occurred during commit flush: %v\n", flushErr) } + return nil } -// Rollback is analogous to public void rollback() -func (r *RecoveryMgr) Rollback() { +func (r *Mgr) Rollback() error { r.doRollback() r.bm.Policy().FlushAll(r.txNum) - lsn := RollbackRecordWriteToLog(r.lm, r.txNum) + lsn, err := log_record.RollbackRecordWriteToLog(r.lm, r.txNum) + if err != nil { + return fmt.Errorf("error occurred during rollback: %v\n", err) + } flushErr := r.lm.Buffer().FlushLSN(lsn) if flushErr != nil { - fmt.Printf("error occurred during rollback flush: %v\n", flushErr) + return fmt.Errorf("error occurred during rollback flush: %v\n", flushErr) } + return nil } -// Recover is analogous to public void recover() -func (r *RecoveryMgr) Recover() { +func (r *Mgr) Recover() error { r.doRecover() r.bm.Policy().FlushAll(r.txNum) - lsn := CheckpointRecordWriteToLog(r.lm) // e.g., CheckpointRecord.writeToLog(lm) + lsn, err := log_record.CheckpointRecordWriteToLog(r.lm) + if err != nil { + return fmt.Errorf("error occurred during recovery checkpoint: %v\n", err) + } flushErr := r.lm.Buffer().FlushLSN(lsn) if flushErr != nil { - fmt.Printf("error occurred during recovery flush: %v\n", flushErr) + return fmt.Errorf("error occurred during recovery flush: %v\n", flushErr) } + return nil } // SetCellValue updates the cell in a slotted page, then writes a unified log record // that stores the old/new serialized cell bytes for undo/redo. -func (r *RecoveryMgr) SetCellValue(buff *buffer.Buffer, key []byte, newVal any) (int, error) { +func (r *Mgr) SetCellValue(buff *buffer.Buffer, key []byte, newVal any) (int, error) { // 1. Get the slotted page from the buffer. sp := buff.Contents() @@ -93,14 +98,14 @@ func (r *RecoveryMgr) SetCellValue(buff *buffer.Buffer, key []byte, newVal any) // 6. Write a unified update record to the log: includes txNum, block ID, slotIndex, oldBytes, newBytes. blk := buff.Block() // or any *BlockId if your Buffer returns it - lsn := log_record.WriteUnifiedUpdateLogRecord(r.lm, r.txNum, blk, key, oldBytes, newBytes) + lsn := log_record.WriteToLog(r.lm, r.txNum, *blk, key, oldBytes, newBytes) // 7. Return the LSN so the caller can handle further flush or keep track of it. return lsn, nil } // doRollback performs a backward scan of the log to undo any record belonging to this transaction. -func (r *RecoveryMgr) doRollback() { +func (r *Mgr) doRollback() { iter, err := r.lm.Iterator() if err != nil { fmt.Printf("error occurred creating log iterator: %v\n", err) @@ -112,22 +117,25 @@ func (r *RecoveryMgr) doRollback() { fmt.Printf("error occurred reading next log record: %v\n", err) return } - rec := CreateLogRecord(data) // e.g. UnifiedUpdateRecord or other record + rec := log_record.CreateLogRecord(data) // e.g. UnifiedUpdateRecord or other record if rec == nil { continue } if rec.TxNumber() == r.txNum { - if rec.Op() == START { + if rec.Op() == log_record.START { // Once we reach the START record for our transaction, we stop return } - rec.Undo(r.tx) // "Undo" is record-specific logic + err := rec.Undo(r.tx) + if err != nil { + return + } } } } // doRecover replays the log from the end, undoing updates for transactions that never committed. -func (r *RecoveryMgr) doRecover() { +func (r *Mgr) doRecover() { finishedTxs := make(map[int64]bool) iter, err := r.lm.Iterator() @@ -141,18 +149,21 @@ func (r *RecoveryMgr) doRecover() { fmt.Printf("error occurred reading next log record: %v\n", err) return } - rec := CreateLogRecord(data) + rec := log_record.CreateLogRecord(data) if rec == nil { continue } switch rec.Op() { - case CHECKPOINT: + case log_record.CHECKPOINT: return - case COMMIT, ROLLBACK: + case log_record.COMMIT, log_record.ROLLBACK: finishedTxs[rec.TxNumber()] = true default: if !finishedTxs[rec.TxNumber()] { - rec.Undo(r.tx) + err := rec.Undo(r.tx) + if err != nil { + return + } } } } diff --git a/recovery/recovery_test.go b/recovery/recovery_test.go new file mode 100644 index 0000000..0bcb617 --- /dev/null +++ b/recovery/recovery_test.go @@ -0,0 +1,123 @@ +package recovery_test + +import ( + "os" + "path/filepath" + "testing" + "time" + "ultraSQL/transaction" + + "ultraSQL/buffer" + "ultraSQL/kfile" + "ultraSQL/log" + "ultraSQL/log_record" + "ultraSQL/recovery" +) + +// 1) A minimal dummy Tx that implements txinterface.TxInterface. +type dummyTx struct { + txNum int64 +} + +// GetTxNum is the only method required by our interface here. +func (d *dummyTx) GetTxNum() int64 { + return d.txNum +} + +// TestRecoveryMgrLifecycle verifies that RecoveryMgr appends +// the correct log records (START, COMMIT, ROLLBACK, CHECKPOINT) +// and flushes them to disk. +func TestRecoveryMgrLifecycle(t *testing.T) { + // Setup: Create a temporary directory for file storage. + tempDir := filepath.Join(os.TempDir(), "ultraSQL_test_"+time.Now().Format("20060102150405")) + blockSize := 4096 + + fm, err := kfile.NewFileMgr(tempDir, blockSize) + if err != nil { + t.Fatalf("Failed to create FileMgr: %v", err) + } + defer func() { + fm.Close() + os.RemoveAll(tempDir) + }() + + // Create a small LRU-based buffer manager. + policy := buffer.InitLRU(2, fm) + bm := buffer.NewBufferMgr(fm, 2, policy) + + // Create a log manager. + lm, err := log.NewLogMgr(fm, bm, "log_test.db") + if err != nil { + t.Fatalf("Failed to create LogMgr: %v", err) + } + // Create the transaction manager. + txMgr := transaction.NewTransaction(fm, lm, bm) + if txMgr == nil { + t.Fatal("Transaction manager is nil") + } + + // 3) Create the RecoveryMgr for our dummy transaction. + rm := recovery.NewRecoveryMgr(txMgr, txMgr.GetTxNum(), lm, bm) + if rm == nil { + t.Fatal("Expected a new RecoveryMgr, got nil") + } + + // Start record should have been written. We'll verify it shortly. + + // 4) Test Commit + if err := rm.Commit(); err != nil { + t.Errorf("Commit returned error: %v", err) + } + + // 5) Test Rollback + if err := rm.Rollback(); err != nil { + t.Errorf("Rollback returned error: %v", err) + } + + // 6) Test Recover + if err := rm.Recover(); err != nil { + t.Errorf("Recover returned error: %v", err) + } + + // 7) Now read the log records in reverse order (iterator is typically LIFO). + iter, err := lm.Iterator() + if err != nil { + t.Fatalf("Failed to create log iterator: %v", err) + } + + // We'll collect the operations we see, in the order we see them. + var ops []int32 + + for iter.HasNext() { + recordData, err := iter.Next() + if err != nil { + t.Fatalf("Failed to read log record: %v", err) + } + rec := log_record.CreateLogRecord(recordData) + if rec == nil { + t.Fatalf("Failed to parse log record from data") + } + ops = append(ops, rec.Op()) + } + + // Because the iterator is LIFO, the last written record is first in ops. + // We expect (in write order): START, COMMIT, ROLLBACK, CHECKPOINT + // So in LIFO order: CHECKPOINT, ROLLBACK, COMMIT, START + + if len(ops) < 4 { + t.Fatalf("Expected at least 4 records (START, COMMIT, ROLLBACK, CHECKPOINT), found %d", len(ops)) + } + + if ops[0] != log_record.CHECKPOINT { + t.Errorf("Expected first log record to be CHECKPOINT, got %v", ops[0]) + } + if ops[1] != log_record.ROLLBACK { + t.Errorf("Expected second log record to be ROLLBACK, got %v", ops[1]) + } + if ops[2] != log_record.COMMIT { + t.Errorf("Expected third log record to be COMMIT, got %v", ops[2]) + } + if ops[3] != log_record.START { + t.Errorf("Expected fourth log record to be START, got %v", ops[3]) + } +} diff --git a/transaction/transactionMgr.go b/transaction/transactionMgr.go index dac17be..8259755 100644 --- a/transaction/transactionMgr.go +++ b/transaction/transactionMgr.go @@ -10,54 +10,72 @@ import ( "ultraSQL/recovery" ) -type TransactionMgr struct { +type Mgr struct { nextTxNum int64 - EndOfFile int - rm *recovery.RecoveryMgr - cm *concurrency.ConcurrencyMgr + EndOfFile int32 + rm *recovery.Mgr + cm *concurrency.Mgr bm *buffer.BufferMgr fm *kfile.FileMgr - txtnum int64 + txNum int64 bufferList *BufferList } -func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *TransactionMgr { - tx := &TransactionMgr{ +func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *Mgr { + tx := &Mgr{ fm: fm, bm: bm, } tx.nextTxNum = tx.nextTxNumber() - tx.rm = recovery.NewRecoveryMgr(tx, tx.txtnum, lm, bm) + tx.rm = recovery.NewRecoveryMgr(tx, tx.txNum, lm, bm) tx.cm = concurrency.NewConcurrencyMgr() tx.bufferList = NewBufferList(bm) return tx } -func (t *TransactionMgr) Commit() { - t.rm.Commit() - t.cm.Release() +func (t *Mgr) Commit() error { + err := t.rm.Commit() + if err != nil { + return err + } + err = t.cm.Release() + if err != nil { + return err + } t.bufferList.UnpinAll() + return nil } -func (t *TransactionMgr) Rollback() { - t.rm.Rollback() - t.cm.Release() +func (t *Mgr) Rollback() error { + err := t.rm.Rollback() + if err != nil { + return err + } + err = t.cm.Release() + if err != nil { + return err + } t.bufferList.UnpinAll() + return nil } -func (t *TransactionMgr) Recover() { - t.bm.Policy().FlushAll(t.txtnum) - t.rm.Recover() +func (t *Mgr) Recover() error { + t.bm.Policy().FlushAll(t.txNum) + err := t.rm.Recover() + if err != nil { + return err + } + return nil } -func (t *TransactionMgr) Pin(blk kfile.BlockId) error { +func (t *Mgr) Pin(blk kfile.BlockId) error { err := t.bufferList.Pin(blk) if err != nil { return fmt.Errorf("failed to pin block %v: %w", blk, err) } return nil } -func (t *TransactionMgr) UnPin(blk kfile.BlockId) error { +func (t *Mgr) UnPin(blk kfile.BlockId) error { err := t.bufferList.Unpin(blk) if err != nil { return fmt.Errorf("failed to pin block %v: %w", blk, err) @@ -65,7 +83,7 @@ func (t *TransactionMgr) UnPin(blk kfile.BlockId) error { return nil } -func (t *TransactionMgr) Size(filename string) (int, error) { +func (t *Mgr) Size(filename string) (int32, error) { dummyblk := kfile.NewBlockId(filename, t.EndOfFile) err := t.cm.SLock(*dummyblk) if err != nil { @@ -78,7 +96,7 @@ func (t *TransactionMgr) Size(filename string) (int, error) { return fileLength, nil } -func (t *TransactionMgr) append(filename string) *kfile.BlockId { +func (t *Mgr) append(filename string) *kfile.BlockId { dummyblk := kfile.NewBlockId(filename, t.EndOfFile) t.cm.XLock(*dummyblk) blk, err := t.fm.Append(filename) @@ -87,18 +105,18 @@ func (t *TransactionMgr) append(filename string) *kfile.BlockId { } return blk } -func (t *TransactionMgr) blockSize() int { +func (t *Mgr) blockSize() int { return t.fm.BlockSize() } -func (t *TransactionMgr) AvailableBuffs() int { +func (t *Mgr) AvailableBuffs() int { return t.bm.Available() } -func (t *TransactionMgr) nextTxNumber() int64 { +func (t *Mgr) nextTxNumber() int64 { return atomic.AddInt64(&t.nextTxNum, 1) } -func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { +func (t *Mgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { t.cm.SLock(blk) buff := t.bufferList.Buffer(blk) cell, _, err := buff.Contents().FindCell(key) @@ -108,17 +126,15 @@ func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { return cell } -func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) error { +func (t *Mgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) error { t.cm.XLock(blk) - buff := t.bufferList.Buffer(blk) - lsn := -1 var err error - if okToLog { - lsn, err = t.rm.SetCellValue(buff, key, val) - if err != nil { - return nil - } + err = t.Pin(blk) + if err != nil { + return err } + buff := t.bufferList.Buffer(blk) + lsn := -1 cellKey := key cell := kfile.NewKVCell(cellKey) p := buff.Contents() @@ -126,6 +142,18 @@ func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okTo if err != nil { return fmt.Errorf("failed to pin block %v: %w", blk, err) } - buff.MarkModified(t.txtnum, lsn) + buff.MarkModified(t.txNum, lsn) + if okToLog { + lsn, err = t.rm.SetCellValue(buff, key, val) + if err != nil { + return err + } + } + return nil } + +// GetTxNum is required by the TxInterface. +func (t *Mgr) GetTxNum() int64 { + return t.nextTxNum +} diff --git a/transaction/transaction_test.go b/transaction/transaction_test.go new file mode 100644 index 0000000..e2a5096 --- /dev/null +++ b/transaction/transaction_test.go @@ -0,0 +1,199 @@ +package transaction + +import ( + "os" + "path/filepath" + "testing" + "time" + "ultraSQL/buffer" + "ultraSQL/kfile" + "ultraSQL/log" + "ultraSQL/log_record" +) + +func TestLogRecordLifecycle(t *testing.T) { + // Setup: create a temporary directory for the file manager. + tempDir := filepath.Join(os.TempDir(), "simpledb_test_"+time.Now().Format("20060102150405")) + blockSize := 400 + fm, err := kfile.NewFileMgr(tempDir, blockSize) + if err != nil { + t.Fatalf("Failed to create FileMgr: %v", err) + } + // Clean up after the test. + defer func() { + fm.Close() + os.RemoveAll(tempDir) + }() + + // Initialize a small LRU policy and a BufferMgr with capacity 2. + policy := buffer.InitLRU(2, fm) + bm := buffer.NewBufferMgr(fm, 2, policy) + + // Create a LogMgr (assuming it takes a FileMgr, BufferMgr, and log file name). + lm, err := log.NewLogMgr(fm, bm, "log_test.db") + if err != nil { + t.Fatalf("Failed to create LogMgr: %v", err) + } + + // Define expected op codes. + const ( + CHECKPOINT = iota + START + COMMIT + ROLLBACK + SETINT + SETSTRING + UNIFIEDUPDATE + ) + + // Test the full log record lifecycle. + t.Run("Full log record lifecycle", func(t *testing.T) { + // 1. Write several records. + // Here we create a slice of expected records with their txNum and op code. + expectedRecords := []struct { + txNum int64 + op int32 + }{ + {1, START}, + {1, COMMIT}, + {2, START}, + {2, ROLLBACK}, + } + + // For each expected record, create a log record (using your log_record package) + // and append it to the log manager. + for _, expected := range expectedRecords { + var record log_record.Ilog_record // assuming LogRecord is your interface + switch expected.op { + case START: + record = log_record.NewStartRecord(expected.txNum) + case COMMIT: + record = log_record.NewCommitRecord(expected.txNum) + case ROLLBACK: + record = log_record.NewRollbackRecord(expected.txNum) + // You can add additional cases if needed. + } + + // Append the serialized record. + _, _, err := lm.Append(record.ToBytes()) + if err != nil { + t.Fatalf("Failed to append record: %v", err) + } + } + + // 2. Flush the log so that all records are persisted. + if err := lm.Flush(); err != nil { + t.Fatalf("Failed to flush log: %v", err) + } + + // 3. Create an iterator and read back the records. + iter, err := lm.Iterator() + if err != nil { + t.Fatalf("Failed to create log iterator: %v", err) + } + + // We expect the records in reverse order (assuming the log iterator returns records in LIFO order). + recordCount := len(expectedRecords) - 1 + for iter.HasNext() { + recordData, err := iter.Next() + if err != nil { + t.Fatalf("Failed to get next record: %v", err) + } + + record := log_record.CreateLogRecord(recordData) + if record == nil { + t.Fatalf("Failed to create record from data") + } + + if recordCount < 0 { + t.Errorf("Found more records than expected") + break + } + + expected := expectedRecords[recordCount] + if record.Op() != expected.op { + t.Errorf("Record %d: expected op %v, got %v", recordCount, expected.op, record.Op()) + } + if record.TxNumber() != expected.txNum { + t.Errorf("Record %d: expected txNum %v, got %v", recordCount, expected.txNum, record.TxNumber()) + } + recordCount-- + } + + if recordCount >= 0 { + t.Errorf("Expected %d records, found %d", len(expectedRecords), len(expectedRecords)-recordCount-1) + } + }) +} + +func TestTransactionManagerLifecycle(t *testing.T) { + // Create a temporary directory for file storage. + tempDir := filepath.Join(os.TempDir(), "ultraSQL_test_"+time.Now().Format("20060102150405")) + blockSize := 8192 + + // Create a file manager. + fm, err := kfile.NewFileMgr(tempDir, blockSize) + if err != nil { + t.Fatalf("Failed to create FileMgr: %v", err) + } + defer func() { + fm.Close() + os.RemoveAll(tempDir) + }() + + // Create a small buffer pool (capacity 2) using an LRU policy. + policy := buffer.InitLRU(2, fm) + bm := buffer.NewBufferMgr(fm, 2, policy) + + // Create a log manager. + lm, err := log.NewLogMgr(fm, bm, "log_test.db") + if err != nil { + t.Fatalf("Failed to create LogMgr: %v", err) + } + + // Create the transaction manager. + txMgr := NewTransaction(fm, lm, bm) + if txMgr == nil { + t.Fatal("Transaction manager is nil") + } + + // Optionally, print initial txMgr state. + t.Logf("Created TransactionMgr with txnum=%d", txMgr.txNum) + + // Test Commit: it should not return an error. + if err := txMgr.Commit(); err != nil { + t.Errorf("Commit returned error: %v", err) + } + + // Test Rollback: it should not return an error. + if err := txMgr.Rollback(); err != nil { + t.Errorf("Rollback returned error: %v", err) + } + + // Test InsertCell: + // Create a dummy block (for example, "testfile" and block number 0). + blk := kfile.NewBlockId("testfile", 0) + // Lock the block using the concurrency manager. + if err := txMgr.cm.XLock(*blk); err != nil { + t.Errorf("Failed to acquire XLock on block %v: %v", blk, err) + } + + // Insert a cell with a dummy key and value. + key := []byte("testkey") + val := "testvalue" + if err := txMgr.InsertCell(*blk, key, val, true); err != nil { + t.Errorf("InsertCell returned error: %v", err) + } else { + t.Log("InsertCell succeeded.") + } + + // Optionally, test finding the cell. + cell := txMgr.FindCell(*blk, key) + if cell == nil { + t.Errorf("Failed to find cell with key %s in block %v", key, blk) + } else { + t.Logf("Found cell with key %s in block %v", key, blk) + } + + // Additional tests (Recover, Pin/Unpin, etc.) can be added here. +} diff --git a/txinterface/txinterface.go b/txinterface/txinterface.go new file mode 100644 index 0000000..65db24f --- /dev/null +++ b/txinterface/txinterface.go @@ -0,0 +1,10 @@ +package txinterface + +import "ultraSQL/kfile" + +type TxInterface interface { + GetTxNum() int64 + Pin(blk kfile.BlockId) error + UnPin(blk kfile.BlockId) error + InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) error +}