Skip to content

Commit eda6800

Browse files
committed
fix(rt): fix CRT read channel buffer management
AbstractBufferedReadChannel incorrectly updated the buffer offset and consumed counts when in a read-suspend loop from `readFully(dest: ByteArray, ...). This caused "holes" in the consumer buffer as well as truncated data where last N bytes were all zeroes. fixes: #526
1 parent 430f1f8 commit eda6800

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

aws-runtime/http-client-engine-crt/common/src/aws/sdk/kotlin/runtime/http/engine/crt/AbstractBufferedReadChannel.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,10 @@ internal abstract class AbstractBufferedReadChannel(
187187
throw ClosedReceiveChannelException("Unexpeced EOF: expected $remaining more bytes")
188188
}
189189

190-
consumed += readAsMuchAsPossible(dest, currOffset, remaining)
191-
currOffset += consumed
192-
remaining -= consumed
190+
val rc = readAsMuchAsPossible(dest, currOffset, remaining)
191+
consumed += rc
192+
currOffset += rc
193+
remaining = length - consumed
193194
} while (remaining > 0)
194195
}
195196

aws-runtime/http-client-engine-crt/common/test/aws/sdk/kotlin/runtime/http/engine/crt/BufferedReadChannelTest.kt

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ package aws.sdk.kotlin.runtime.http.engine.crt
88
import aws.sdk.kotlin.crt.io.byteArrayBuffer
99
import aws.smithy.kotlin.runtime.io.readByte
1010
import aws.smithy.kotlin.runtime.testing.ManualDispatchTestBase
11-
import kotlinx.coroutines.CancellationException
12-
import kotlinx.coroutines.DelicateCoroutinesApi
13-
import kotlinx.coroutines.ExperimentalCoroutinesApi
14-
import kotlinx.coroutines.GlobalScope
15-
import kotlinx.coroutines.async
11+
import aws.smithy.kotlin.runtime.util.Sha256
12+
import aws.smithy.kotlin.runtime.util.encodeToHex
13+
import aws.smithy.kotlin.runtime.util.sha256
14+
import kotlinx.coroutines.*
1615
import kotlinx.coroutines.channels.ClosedReceiveChannelException
17-
import kotlinx.coroutines.launch
1816
import kotlinx.coroutines.test.runTest
19-
import kotlinx.coroutines.yield
17+
import kotlin.random.Random
18+
import kotlin.test.*
2019
import kotlin.test.AfterTest
2120
import kotlin.test.Test
2221
import kotlin.test.assertEquals
@@ -469,4 +468,81 @@ class BufferedReadChannelTest : ManualDispatchTestBase() {
469468
readJob.await()
470469
assertEquals(1_000_000, totalBytes)
471470
}
471+
472+
@OptIn(DelicateCoroutinesApi::class)
473+
private suspend fun runReadSuspendIntegrityTest(reader: suspend (BufferedReadChannel, Int) -> String) {
474+
// writer is setup to write random lengths and delay to cause the reader to enter a suspend loop
475+
val data = ByteArray(16 * 1024 * 1024) { it.toByte() }
476+
var totalBytes = 0
477+
val channel = bufferedReadChannel { size -> totalBytes += size }
478+
val writeSha256 = GlobalScope.async {
479+
var wcRemaining = data.size
480+
var offset = 0
481+
val checksum = Sha256()
482+
while (wcRemaining > 0) {
483+
// random write sizes
484+
val wc = minOf(wcRemaining, Random.nextInt(256, 8 * 1024))
485+
val slice = data.sliceArray(offset until offset + wc)
486+
checksum.update(slice)
487+
channel.write(slice)
488+
offset += wc
489+
wcRemaining -= wc
490+
491+
if (wcRemaining % 256 == 0) {
492+
delay(Random.nextLong(0, 10))
493+
}
494+
}
495+
496+
channel.close()
497+
498+
checksum.digest().encodeToHex()
499+
}
500+
501+
val readSha256 = GlobalScope.async { reader(channel, data.size) }
502+
503+
val origSha = data.sha256().encodeToHex()
504+
val writeSha = writeSha256.await()
505+
val readSha = readSha256.await()
506+
assertEquals(origSha, writeSha)
507+
assertEquals(origSha, readSha)
508+
}
509+
510+
@Test
511+
fun testReadFullyIntegrity() = runTest {
512+
// see https://github.com/awslabs/aws-sdk-kotlin/issues/526
513+
runReadSuspendIntegrityTest { channel, totalSize ->
514+
val dest = ByteArray(totalSize)
515+
channel.readFully(dest)
516+
dest.sha256().encodeToHex()
517+
}
518+
}
519+
520+
@Test
521+
fun testReadAvailableIntegrity() = runTest {
522+
runReadSuspendIntegrityTest { channel, totalSize ->
523+
val checksum = Sha256()
524+
var totalRead = 0
525+
while (!channel.isClosedForRead) {
526+
val chunk = ByteArray(8 * 1024)
527+
val rc = channel.readAvailable(chunk)
528+
if (rc < 0) break
529+
530+
totalRead += rc
531+
val slice = if (rc != chunk.size) chunk.sliceArray(0 until rc) else chunk
532+
checksum.update(slice)
533+
}
534+
535+
assertEquals(totalSize, totalRead)
536+
checksum.digest().encodeToHex()
537+
}
538+
}
539+
540+
@Test
541+
fun testReadRemainingIntegrity() = runTest {
542+
runReadSuspendIntegrityTest { channel, totalSize ->
543+
val data = channel.readRemaining()
544+
assertEquals(totalSize, data.size)
545+
data.sha256().encodeToHex()
546+
}
547+
}
472548
}

0 commit comments

Comments
 (0)