Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions concurrency/concurrencyMgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package concurrency

import (
"fmt"
"sync"
"ultraSQL/kfile"
)

type ConcurrencyMgr struct {
lTble *LockTable
locks map[kfile.BlockId]string
mu sync.RWMutex // Protect shared map access
}

func NewConcurrencyMgr() *ConcurrencyMgr {
return &ConcurrencyMgr{
locks: make(map[kfile.BlockId]string),
}
}

func (cM *ConcurrencyMgr) SLock(blk kfile.BlockId) error {
cM.mu.Lock()
defer cM.mu.Unlock()

// If we already have any lock (S or X), no need to acquire again
if _, exists := cM.locks[blk]; exists {
return nil
}

err := cM.lTble.sLock(blk)
if err != nil {
return fmt.Errorf("failed to acquire shared lock: %w", err)
}

cM.locks[blk] = "S"
return nil
}

func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error {
cM.mu.Lock()
defer cM.mu.Unlock()

// If we already have an X lock, no need to acquire again
if cM.hasXLock(blk) {
return nil
}

// Following the two-phase locking protocol:
// 1. First acquire S lock if we don't have any lock
if _, exists := cM.locks[blk]; !exists {
err := cM.lTble.sLock(blk)
if err != nil {
return fmt.Errorf("failed to acquire initial shared lock: %w", err)
}
cM.locks[blk] = "S"
}

// 2. Then upgrade to X lock
err := cM.lTble.xLock(blk)
if err != nil {
return fmt.Errorf("failed to upgrade to exclusive lock: %w", err)
}

cM.locks[blk] = "X"
return nil
}

func (cM *ConcurrencyMgr) Release() error {
cM.mu.Lock()
defer cM.mu.Unlock()

var errs []error
for blk := range cM.locks {
if err := cM.lTble.unlock(blk); err != nil {
errs = append(errs, fmt.Errorf("failed to release lock for block %v: %w", blk, err))
}
}

// Clear the locks map regardless of errors
cM.locks = make(map[kfile.BlockId]string)

if len(errs) > 0 {
return fmt.Errorf("errors during release: %v", errs)
}
return nil
}

func (cM *ConcurrencyMgr) hasXLock(blk kfile.BlockId) bool {
// Note: Caller must hold mutex
lockType, ok := cM.locks[blk]
return ok && lockType == "X"
}

// Helper method to check current lock status
func (cM *ConcurrencyMgr) GetLockType(blk kfile.BlockId) (string, bool) {
cM.mu.RLock()
defer cM.mu.RUnlock()

lockType, exists := cM.locks[blk]
return lockType, exists
}
114 changes: 114 additions & 0 deletions concurrency/lockTable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package concurrency

import (
"fmt"
"sync"
"time"
"ultraSQL/kfile"
)

const MaxWaitTime = 10 * time.Second

type LockTable struct {
locks map[kfile.BlockId]int // positive: number of shared locks, negative: exclusive lock
mu sync.RWMutex
cond *sync.Cond
}

func NewLockTable() *LockTable {
lt := &LockTable{
locks: make(map[kfile.BlockId]int),
}
lt.cond = sync.NewCond(&lt.mu)
return lt
}

func (lT *LockTable) sLock(blk kfile.BlockId) error {
lT.mu.Lock()
defer lT.mu.Unlock()

deadline := time.Now().Add(MaxWaitTime)

// Wait while there's an exclusive lock on the block
for lT.hasXLock(blk) {
if time.Now().After(deadline) {
return fmt.Errorf("shared lock acquisition timed out for block %v", blk)
}
lT.cond.Wait()
}

// Increment the number of shared locks (or initialize to 1)
val := lT.getLockVal(blk)
lT.locks[blk] = val + 1
return nil
}

func (lT *LockTable) xLock(blk kfile.BlockId) error {
lT.mu.Lock()
defer lT.mu.Unlock()

deadline := time.Now().Add(MaxWaitTime)

// Wait while there are other locks (shared or exclusive)
for lT.hasOtherLocks(blk) {
if time.Now().After(deadline) {
return fmt.Errorf("exclusive lock acquisition timed out for block %v", blk)
}
lT.cond.Wait()
}

// Set to -1 to indicate exclusive lock
lT.locks[blk] = -1
return nil
}

func (lT *LockTable) hasXLock(blk kfile.BlockId) bool {
return lT.getLockVal(blk) < 0
}

func (lT *LockTable) getLockVal(blk kfile.BlockId) int {
val, exists := lT.locks[blk]
if !exists {
return 0
}
return val
}

func (lT *LockTable) hasOtherLocks(blk kfile.BlockId) bool {
val := lT.getLockVal(blk)
return val != 0 && val != 1 // Allow upgrade from single shared lock
}

func (lT *LockTable) unlock(blk kfile.BlockId) error {
lT.mu.Lock()
defer lT.mu.Unlock()

val := lT.getLockVal(blk)
if val == 0 {
return fmt.Errorf("attempting to unlock block %v which is not locked", blk)
}

if val > 1 {
// Decrement shared lock count
lT.locks[blk] = val - 1
} else {
// Remove last shared lock or exclusive lock
delete(lT.locks, blk)
lT.cond.Broadcast() // Wake up waiting goroutines
}
return nil
}

// Helper method to get lock information
func (lT *LockTable) GetLockInfo(blk kfile.BlockId) (lockType string, count int) {
lT.mu.RLock()
defer lT.mu.RUnlock()

val := lT.getLockVal(blk)
if val < 0 {
return "exclusive", 1
} else if val > 0 {
return "shared", val
}
return "none", 0
}
2 changes: 1 addition & 1 deletion kfile/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewBlockId(filename string, blknum int) *BlockId {
}
}

func (b *BlockId) GetFileName() string {
func (b *BlockId) FileName() string {
return b.Filename
}

Expand Down
18 changes: 9 additions & 9 deletions kfile/fileMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (fm *FileMgr) PreallocateFile(blk *BlockId, size int64) error {
return err
}

filename := blk.GetFileName()
filename := blk.FileName()
if err := fm.validatePermissions(); err != nil {
return err
}
Expand All @@ -125,7 +125,7 @@ func (fm *FileMgr) validatePreallocationParams(blk *BlockId, size int64) error {
if size%int64(fm.blocksize) != 0 {
return fmt.Errorf("size must be a multiple of blocksize %d", fm.blocksize)
}
if blk.GetFileName() == "" {
if blk.FileName() == "" {
return fmt.Errorf("invalid filename")
}
return nil
Expand Down Expand Up @@ -193,14 +193,14 @@ func (fm *FileMgr) Read(blk *BlockId, p *SlottedPage) error {
fm.mutex.RLock()
defer fm.mutex.RUnlock()

f, err := fm.getFile(blk.GetFileName())
f, err := fm.getFile(blk.FileName())
if err != nil {
return fmt.Errorf("failed to get file for block %v: %w", blk, err)
}

offset := int64(blk.Number() * fm.blocksize)
if _, err = f.Seek(offset, io.SeekStart); err != nil {
return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err)
return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err)
}
bytesRead, err := f.Read(p.Contents())
if err != nil {
Expand All @@ -224,14 +224,14 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error {
fm.mutex.Lock()
defer fm.mutex.Unlock()

f, err := fm.getFile(blk.GetFileName())
f, err := fm.getFile(blk.FileName())
if err != nil {
return fmt.Errorf("failed to get file for block %v: %w", blk, err)
}

offset := int64(blk.Number() * fm.blocksize)
if _, err = f.Seek(offset, io.SeekStart); err != nil {
return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err)
return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err)
}
bytesWritten, err := f.Write(p.Contents())
if err != nil {
Expand All @@ -241,7 +241,7 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error {
return fmt.Errorf("incomplete write: expected %d bytes, wrote %d", fm.blocksize, bytesWritten)
}
if err = f.Sync(); err != nil {
return fmt.Errorf("failed to sync file %s: %w", blk.GetFileName(), err)
return fmt.Errorf("failed to sync file %s: %w", blk.FileName(), err)
}

fm.blocksWritten++
Expand Down Expand Up @@ -379,7 +379,7 @@ func (fm *FileMgr) WriteLog() []ReadWriteLogEntry {

// ensureFileSize ensures the file has at least the required number of blocks.
func (fm *FileMgr) ensureFileSize(blk *BlockId, requiredBlocks int) error {
currentBlocks, err := fm.Length(blk.GetFileName())
currentBlocks, err := fm.Length(blk.FileName())
if err != nil {
return err
}
Expand All @@ -399,7 +399,7 @@ func (fm *FileMgr) RenameFile(blk *BlockId, newFileName string) error {
return fmt.Errorf("invalid new filename: %s", newFileName)
}

oldFileName := blk.GetFileName()
oldFileName := blk.FileName()

// Close the old file if it is open.
fm.openFilesLock.Lock()
Expand Down
18 changes: 9 additions & 9 deletions kfile/file__dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func TestBlock(t *testing.T) {
Blknum := 5
blk := NewBlockId(Filename, Blknum)

if blk.GetFileName() != Filename {
t.Errorf("Expected Filename %s, got %s", Filename, blk.GetFileName())
if blk.FileName() != Filename {
t.Errorf("Expected Filename %s, got %s", Filename, blk.FileName())
}

if blk.Number() != Blknum {
Expand Down Expand Up @@ -126,13 +126,13 @@ func TestBlock(t *testing.T) {

// Test NextBlock
next := blk.NextBlock()
if next.Number() != 6 || next.GetFileName() != "test.db" {
if next.Number() != 6 || next.FileName() != "test.db" {
t.Error("NextBlock returned incorrect block")
}

// Test PrevBlock
prev := blk.PrevBlock()
if prev.Number() != 4 || prev.GetFileName() != "test.db" {
if prev.Number() != 4 || prev.FileName() != "test.db" {
t.Error("PrevBlock returned incorrect block")
}

Expand Down Expand Up @@ -372,8 +372,8 @@ func TestBlockId(t *testing.T) {
blknum := 5
blk := NewBlockId(filename, blknum)

if blk.GetFileName() != filename {
t.Errorf("Expected Filename %s, got %s", filename, blk.GetFileName())
if blk.FileName() != filename {
t.Errorf("Expected Filename %s, got %s", filename, blk.FileName())
}

if blk.Number() != blknum {
Expand Down Expand Up @@ -448,12 +448,12 @@ func TestBlockId(t *testing.T) {
blk := NewBlockId("test.db", 5)

next := blk.NextBlock()
if next.Number() != 6 || next.GetFileName() != "test.db" {
if next.Number() != 6 || next.FileName() != "test.db" {
t.Error("NextBlock returned incorrect block")
}

prev := blk.PrevBlock()
if prev.Number() != 4 || prev.GetFileName() != "test.db" {
if prev.Number() != 4 || prev.FileName() != "test.db" {
t.Error("PrevBlock returned incorrect block")
}

Expand Down Expand Up @@ -779,7 +779,7 @@ func TestFileRename(t *testing.T) {
t.Errorf("Could not rename file %s", err)
}
want := new_file
got := blk.GetFileName()
got := blk.FileName()
if want != got {
t.Errorf("want %s but got %s", want, got)
}
Expand Down
17 changes: 17 additions & 0 deletions log/IRecord.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package log

const (
CHECKPOINT = iota
START
COMMIT
ROLLBACK
SETINT
SETSTRING
)

type LogRecord interface {
Op() int
TxNumber() int
Undo(txNum int)
// Optionally: a method to serialize or convert to a Cell
}
Loading
Loading