Skip to content

Commit ed7d584

Browse files
committed
Merge branch 'cassandra-4.0' into cassandra-4.1
* cassandra-4.0: Fix memory leak in BufferPoolAllocator when a capacity needs to be extended
2 parents e4ea61c + 3f079b2 commit ed7d584

File tree

4 files changed

+97
-45
lines changed

4 files changed

+97
-45
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* ReadCommandController should close fast to avoid deadlock when building secondary index (CASSANDRA-19564)
33
* Redact security-sensitive information in system_views.settings (CASSANDRA-20856)
44
Merged from 4.0:
5+
* Fix memory leak in BufferPoolAllocator when a capacity needs to be extended (CASSANDRA-20753)
56
* Leveled Compaction doesn't validate maxBytesForLevel when the table is altered/created (CASSANDRA-20570)
67
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884)
78
* Fixed incorrect error message constant for keyspace name length validation (CASSANDRA-20915)

src/java/org/apache/cassandra/net/BufferPoolAllocator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ public long usedSizeInBytes()
9191
return bufferPool.usedSizeInBytes();
9292
}
9393

94+
@VisibleForTesting
95+
long overflowMemoryInBytes()
96+
{
97+
return bufferPool.overflowMemoryInBytes();
98+
}
99+
94100
void release()
95101
{
96102
}
@@ -117,6 +123,7 @@ public ByteBuf capacity(int newCapacity)
117123

118124
ByteBuf newBuffer = super.capacity(newCapacity);
119125
ByteBuffer nioBuffer = newBuffer.nioBuffer(0, newBuffer.capacity());
126+
nioBuffer = bufferPool.unwrapBufferPoolManagedBuffer(nioBuffer);
120127

121128
bufferPool.put(wrapped);
122129
wrapped = nioBuffer;

src/java/org/apache/cassandra/utils/memory/BufferPool.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,4 +1627,30 @@ int unsafeNumChunks()
16271627
+ (pool.chunks.chunk1 != null ? 1 : 0)
16281628
+ (pool.chunks.chunk2 != null ? 1 : 0);
16291629
}
1630+
1631+
/**
1632+
* @return the inner buffer if it has a BufferPool.Chunk attached
1633+
* and originalBuffer in other cases
1634+
*/
1635+
public ByteBuffer unwrapBufferPoolManagedBuffer(ByteBuffer originalBuffer)
1636+
{
1637+
int MAX_DEPTH = 32; // a protection against possible loops in attachments
1638+
int depth = 0;
1639+
ByteBuffer buffer = originalBuffer;
1640+
do
1641+
{
1642+
if (buffer == null || !isExactlyDirect(buffer))
1643+
return originalBuffer;
1644+
if (Chunk.getParentChunk(buffer) != null)
1645+
return buffer;
1646+
1647+
Object attachment = MemoryUtil.getAttachment(buffer);
1648+
if (!(attachment instanceof ByteBuffer))
1649+
return originalBuffer;
1650+
buffer = (ByteBuffer) attachment;
1651+
depth++;
1652+
}
1653+
while (depth < MAX_DEPTH);
1654+
return originalBuffer;
1655+
}
16301656
}

test/unit/org/apache/cassandra/net/BufferPoolAllocatorTest.java

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,69 @@
2222
import java.util.Arrays;
2323
import java.util.Random;
2424

25+
import org.junit.BeforeClass;
2526
import org.junit.Test;
2627

2728
import io.netty.buffer.ByteBuf;
29+
import org.apache.cassandra.config.DataStorageSpec;
2830
import org.apache.cassandra.config.DatabaseDescriptor;
31+
import org.assertj.core.api.Assertions;
2932

3033
import static org.junit.Assert.assertArrayEquals;
3134
import static org.junit.Assert.assertEquals;
3235

3336
public class BufferPoolAllocatorTest
3437
{
35-
@Test
36-
public void testAdoptedBufferContentAfterResize() {
38+
39+
@BeforeClass
40+
public static void beforeClass()
41+
{
3742
DatabaseDescriptor.clientInitialization();
38-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
39-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
43+
// cache size hould be more than a macro chunk size for proper pool testing
44+
// if it is 0 or less than a macro chunk size we actually do not pool
45+
DatabaseDescriptor.getRawConfig().networking_cache_size = new DataStorageSpec.IntMebibytesBound(128);
46+
}
4047

48+
@Test
49+
public void testAdoptedBufferContentAfterResize() {
50+
ByteBuf buffer = allocateByteBuf(200, 500);
51+
int originalCapacity = buffer.capacity();
4152
byte[] content = new byte[300];
4253

4354
Random rand = new Random();
4455
rand.nextBytes(content);
4556

4657
buffer.writeBytes(Arrays.copyOfRange(content, 0, 200));
47-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
58+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
4859

4960
buffer.writeBytes(Arrays.copyOfRange(content, 200, 300));
61+
int increasedCapacity = buffer.capacity();
62+
assertEquals(increasedCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
5063

5164
byte[] bufferContent = new byte[300];
5265

5366
BufferPoolAllocator.Wrapped wrapped = (BufferPoolAllocator.Wrapped) buffer;
5467
ByteBuffer adopted = wrapped.adopt();
5568
adopted.get(bufferContent);
5669
assertArrayEquals(content, bufferContent);
57-
assertEquals(500, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
70+
assertEquals(increasedCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
5871

5972
GlobalBufferPoolAllocator.instance.put(adopted);
60-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
73+
ensureThatAllMemoryIsReturnedBackToBufferPool();
6174
}
6275

6376
@Test
6477
public void testAdoptedBufferContentBeforeResize() {
65-
DatabaseDescriptor.clientInitialization();
66-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 300);
67-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
78+
ByteBuf buffer = allocateByteBuf(200, 300);
79+
int originalCapacity = buffer.capacity();
6880

6981
byte[] content = new byte[200];
7082

7183
Random rand = new Random();
7284
rand.nextBytes(content);
7385

7486
buffer.writeBytes(content);
75-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
87+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
7688

7789
byte[] bufferContent = new byte[200];
7890

@@ -82,115 +94,121 @@ public void testAdoptedBufferContentBeforeResize() {
8294
assertArrayEquals(content, bufferContent);
8395

8496
GlobalBufferPoolAllocator.instance.put(adopted);
85-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
97+
ensureThatAllMemoryIsReturnedBackToBufferPool();
8698
}
8799

88100
@Test
89101
public void testPutPooledBufferBackIntoPool() {
90-
DatabaseDescriptor.clientInitialization();
91-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
92-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
102+
ByteBuf buffer = allocateByteBuf(200, 500);
93103
buffer.writeBytes(new byte[200]);
94104

95105
buffer.release();
96-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
106+
ensureThatAllMemoryIsReturnedBackToBufferPool();
97107
}
98108

99109
@Test
100110
public void testPutResizedBufferBackIntoPool() {
101-
DatabaseDescriptor.clientInitialization();
102-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
103-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
111+
ByteBuf buffer = allocateByteBuf(200, 500);
104112
buffer.writeBytes(new byte[500]);
105113

106114
buffer.release();
107-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
115+
ensureThatAllMemoryIsReturnedBackToBufferPool();
108116
}
109117

110118
@Test
111119
public void testBufferDefaultMaxCapacity()
112120
{
113-
DatabaseDescriptor.clientInitialization();
114121
ByteBuf noMaxCapacity = GlobalBufferPoolAllocator.instance.buffer(100);
115122
noMaxCapacity.writeBytes(new byte[100]);
116123
assertEquals(100, noMaxCapacity.readableBytes());
117124
noMaxCapacity.release();
118-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
125+
ensureThatAllMemoryIsReturnedBackToBufferPool();
119126
}
120127

121128
@Test
122129
public void testBufferWithMaxCapacity()
123130
{
124-
DatabaseDescriptor.clientInitialization();
125-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(100, 500);
131+
ByteBuf buffer = allocateByteBuf(100, 500);
126132
buffer.writeBytes(new byte[500]);
127133
assertEquals(500, buffer.readableBytes());
128-
assertEquals(500, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
134+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
129135
buffer.release();
130-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
136+
ensureThatAllMemoryIsReturnedBackToBufferPool();
131137
}
132138

133139
@Test
134140
public void testBufferContentAfterResize()
135141
{
136-
DatabaseDescriptor.clientInitialization();
137-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 300);
138-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
142+
ByteBuf buffer = allocateByteBuf(200, 300);
143+
int originalCapacity = buffer.capacity();
139144

140145
byte[] content = new byte[300];
141-
142146
Random rand = new Random();
143147
rand.nextBytes(content);
144148

145149
buffer.writeBytes(Arrays.copyOfRange(content, 0, 200));
146-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
150+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
147151

148152
buffer.writeBytes(Arrays.copyOfRange(content, 200, 300));
149153

150154
byte[] bufferContent = new byte[300];
151155
buffer.readBytes(bufferContent);
152156
assertArrayEquals(content, bufferContent);
153-
assertEquals(300, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
157+
Assertions.assertThat(buffer.capacity()).isGreaterThanOrEqualTo(300);
158+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
159+
154160
buffer.release();
155-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
161+
ensureThatAllMemoryIsReturnedBackToBufferPool();
162+
156163
}
157164

158165
@Test(expected = IndexOutOfBoundsException.class)
159166
public void testBufferExceedMaxCapacity()
160167
{
161-
DatabaseDescriptor.clientInitialization();
162-
ByteBuf maxCapacity = GlobalBufferPoolAllocator.instance.buffer(100, 200);
168+
ByteBuf maxCapacity = allocateByteBuf(100, 200);
163169
try
164170
{
165171
maxCapacity.writeBytes(new byte[300]);
166172
} finally {
167173
maxCapacity.release();
168-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
174+
ensureThatAllMemoryIsReturnedBackToBufferPool();
169175
}
170176
}
171177

172178
@Test
173179
public void testResizeBufferMultipleTimes()
174180
{
175-
DatabaseDescriptor.clientInitialization();
176-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(100, 2000);
181+
ByteBuf buffer = allocateByteBuf(100, 2000);
177182
buffer.writeBytes(new byte[200]);
178183
assertEquals(200, buffer.readableBytes());
179-
assertEquals(256, buffer.capacity());
180-
assertEquals(256, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
184+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
181185

182186
buffer.writeBytes(new byte[100]);
183187
assertEquals(300, buffer.readableBytes());
184-
assertEquals(512, buffer.capacity());
185-
assertEquals(512, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
188+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
186189

187190
buffer.writeBytes(new byte[300]);
188191
assertEquals(600, buffer.readableBytes());
189-
assertEquals(1024, buffer.capacity());
190-
assertEquals(1024, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
192+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
191193

192194
buffer.release();
193-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
195+
ensureThatAllMemoryIsReturnedBackToBufferPool();
196+
}
197+
198+
private static ByteBuf allocateByteBuf(int initialCapacity, int maxCapacity)
199+
{
200+
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(initialCapacity, maxCapacity);
201+
int originalCapacity = buffer.capacity();
202+
203+
// BufferPool can allocate more capacity than requested to avoid fragmentation
204+
Assertions.assertThat(originalCapacity).isGreaterThanOrEqualTo(initialCapacity);
205+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
206+
return buffer;
194207
}
195208

209+
private static void ensureThatAllMemoryIsReturnedBackToBufferPool()
210+
{
211+
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
212+
assertEquals(0, GlobalBufferPoolAllocator.instance.overflowMemoryInBytes());
213+
}
196214
}

0 commit comments

Comments
 (0)