Skip to content

Commit 349cf96

Browse files
committed
buffer copies
1 parent 216a620 commit 349cf96

File tree

8 files changed

+128
-35
lines changed

8 files changed

+128
-35
lines changed

recordio/async_writer.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package recordio
55
import (
66
"github.com/godzie44/go-uring/uring"
77
"os"
8-
"sync/atomic"
98
)
109

1110
// AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close.
@@ -28,13 +27,19 @@ func (w *AsyncWriter) Write(p []byte) (int, error) {
2827
}
2928
}
3029

31-
err := w.ring.QueueSQE(uring.Write(w.file.Fd(), p, w.offset), 0, 0)
30+
// TODO(thomas): we would need to make a defensive copy for p, which actually is not optimal
31+
// the reason is the buffer pooling (or the header reuse). It so happens that the original backing array was written
32+
// a couple times before the ring was submitted. That caused some funny offsets to be written and eventually fail reading.
33+
pc := make([]byte, len(p))
34+
copy(pc, p)
35+
36+
err := w.ring.QueueSQE(uring.Write(w.file.Fd(), pc, w.offset), 0, 0)
3237
if err != nil {
3338
return 0, err
3439
}
3540

36-
atomic.AddInt32(&w.submittedSQEs, 1)
37-
atomic.AddUint64(&w.offset, uint64(len(p)))
41+
w.submittedSQEs++
42+
w.offset += uint64(len(p))
3843

3944
return len(p), nil
4045
}
@@ -58,7 +63,7 @@ func (w *AsyncWriter) submitAwaitOne() error {
5863
return err
5964
}
6065

61-
atomic.AddInt32(&w.submittedSQEs, -1)
66+
w.submittedSQEs--
6267
w.ring.SeenCQE(cqe)
6368

6469
err = cqe.Error()
@@ -92,20 +97,20 @@ func (w *AsyncWriter) Close() error {
9297
return w.file.Close()
9398
}
9499

95-
func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, error) {
100+
func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, *os.File, error) {
96101
ring, err := uring.New(numRingEntries, opts...)
97102
if err != nil {
98-
return nil, err
103+
return nil, nil, err
99104
}
100105

101106
writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
102107
if err != nil {
103-
return nil, err
108+
return nil, nil, err
104109
}
105110

106111
err = ring.RegisterFiles([]int{int(writeFile.Fd())})
107112
if err != nil {
108-
return nil, err
113+
return nil, nil, err
109114
}
110115

111116
writer := &AsyncWriter{
@@ -114,5 +119,5 @@ func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupO
114119
ring: ring,
115120
}
116121

117-
return writer, nil
122+
return writer, writeFile, nil
118123
}

recordio/async_writer_test.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
package recordio
44

55
import (
6+
"github.com/stretchr/testify/assert"
67
"github.com/stretchr/testify/require"
7-
"github.com/thomasjungblut/go-sstables/recordio/iouring"
88
"io/ioutil"
9+
"os"
910
"testing"
1011
)
1112

1213
func TestAsyncWriter_HappyPath(t *testing.T) {
13-
ok, err := iouring.IsIOUringAvailable()
14+
ok, err := IsIOUringAvailable()
1415
require.NoError(t, err)
1516
if !ok {
1617
t.Skip("iouring not available here")
@@ -21,13 +22,68 @@ func TestAsyncWriter_HappyPath(t *testing.T) {
2122
require.NoError(t, err)
2223
defer closeCleanFile(t, temp)
2324

24-
writer, err := NewAsyncWriter(temp.Name(), 4)
25+
writer, file, err := NewAsyncWriter(temp.Name(), 4)
2526
require.NoError(t, err)
27+
require.NotNil(t, file)
2628

27-
for i := 0; i < 10000; i++ {
28-
_, err = writer.Write(randomRecordOfSize(1024))
29+
var expected []byte
30+
for i := 0; i < 100; i++ {
31+
s := randomRecordOfSize(10)
32+
_, err = writer.Write(s)
2933
require.NoError(t, err)
34+
expected = append(expected, s...)
3035
}
3136

3237
require.NoError(t, writer.Close())
38+
fileContentEquals(t, file, expected)
39+
}
40+
41+
func TestAsyncWriter_GuardAgainstBufferReuse(t *testing.T) {
42+
ok, err := IsIOUringAvailable()
43+
require.NoError(t, err)
44+
if !ok {
45+
t.Skip("iouring not available here")
46+
return
47+
}
48+
49+
temp, err := ioutil.TempFile("", "TestAsyncWriter_GuardAgainstBufferReuse")
50+
require.NoError(t, err)
51+
defer closeCleanFile(t, temp)
52+
53+
writer, file, err := NewAsyncWriter(temp.Name(), 4)
54+
require.NoError(t, err)
55+
require.NotNil(t, file)
56+
57+
reusedSlice := []byte{13, 06, 91}
58+
// we are writing the same slice, three times before a forced flush due to capacity
59+
writeBuf(t, writer, reusedSlice)
60+
writeBuf(t, writer, reusedSlice)
61+
writeBuf(t, writer, reusedSlice)
62+
// fourth time we change the slice in-place
63+
reusedSlice[0] = 29
64+
writeBuf(t, writer, reusedSlice)
65+
writeBuf(t, writer, reusedSlice)
66+
require.NoError(t, writer.Close())
67+
68+
fileContentEquals(t, file, []byte{
69+
13, 06, 91,
70+
13, 06, 91,
71+
13, 06, 91,
72+
29, 06, 91,
73+
29, 06, 91,
74+
})
75+
}
76+
77+
func fileContentEquals(t *testing.T, file *os.File, expectedContent []byte) {
78+
f, err := os.Open(file.Name())
79+
require.NoError(t, err)
80+
all, err := ioutil.ReadAll(f)
81+
require.NoError(t, err)
82+
assert.Equal(t, expectedContent, all)
83+
}
84+
85+
func writeBuf(t *testing.T, writer WriteCloserFlusher, buf []byte) {
86+
o, err := writer.Write(buf)
87+
require.NoError(t, err)
88+
assert.Equal(t, len(buf), o)
3389
}

recordio/file_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte {
8888
}
8989

9090
// for legacy reference still around, main paths unused - mostly for tests writing old versions
91-
//noinspection GoUnusedFunction
91+
// noinspection GoUnusedFunction
9292
func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) {
9393
// 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes
9494
bytes := make([]byte, RecordHeaderSizeBytes)

recordio/file_writer_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ func TestWriterInitNoPath(t *testing.T) {
158158
assert.Equal(t, errors.New("NewFileWriter: either os.File or string path must be supplied, never both"), err)
159159
}
160160

161+
func TestWriterDirectIOAndIOUringDisabled(t *testing.T) {
162+
_, err := NewFileWriter(Path("/tmp/abc"), DirectIO(), IOUring(4))
163+
assert.Equal(t, errors.New("NewFileWriter: either directIO or io_uring must be enabled, never both"), err)
164+
}
165+
161166
func TestWriterCrashCreatesValidHeader(t *testing.T) {
162167
tmpFile, err := ioutil.TempFile("", "recordio_CrashCreatesValidHeader")
163168
require.Nil(t, err)

recordio/io_uring.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package recordio
2+
3+
import (
4+
"github.com/godzie44/go-uring/uring"
5+
"os"
6+
)
7+
8+
type IOUringFactory struct {
9+
numRingEntries uint32
10+
opts []uring.SetupOption
11+
}
12+
13+
func (f *IOUringFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) {
14+
//TODO implement me
15+
panic("implement me")
16+
}
17+
18+
func (f *IOUringFactory) CreateNewWriter(filePath string, _ int) (*os.File, WriteCloserFlusher, error) {
19+
writer, file, err := NewAsyncWriter(filePath, f.numRingEntries, f.opts...)
20+
if err != nil {
21+
return nil, nil, err
22+
}
23+
24+
return file, writer, nil
25+
}
26+
27+
func NewIOUringFactory(numRingEntries uint32, opts ...uring.SetupOption) *IOUringFactory {
28+
return &IOUringFactory{
29+
numRingEntries: numRingEntries,
30+
opts: opts,
31+
}
32+
}
33+
34+
// IsIOUringAvailable tests whether io_uring is supported by the kernel.
35+
// It will return (true, nil) if that's the case, if it's not available it will be (false, nil).
36+
// Any other error will be indicated by the error (either true/false).
37+
func IsIOUringAvailable() (available bool, err error) {
38+
ring, err := uring.New(1)
39+
defer func() {
40+
err = ring.Close()
41+
}()
42+
43+
return err == nil, err
44+
}

recordio/iouring/iouring_test.go renamed to recordio/io_uring_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package iouring
1+
package recordio
22

33
import (
44
"github.com/stretchr/testify/require"

recordio/iouring/iouring.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

recordio/recordio_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestReadWriteEndToEndDirectIO(t *testing.T) {
6161
return
6262
}
6363

64-
tmpFile, err := ioutil.TempFile("", "recordio_EndToEnd")
64+
tmpFile, err := ioutil.TempFile("", "recordio_EndToEndDirectIO")
6565
require.NoError(t, err)
6666
defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }()
6767
writer, err := NewFileWriter(File(tmpFile), DirectIO())

0 commit comments

Comments
 (0)