Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.2.0
-----
* Fix LEAK DETECTED errors during bulk read (CASSANALYTICS-87)
* Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84)
* Analytics job fails when source table has secondary indexes (CASSANALYTICS-86)
* Set KeyStore to be optional (CASSANALYTICS-69)
Expand Down
1 change: 1 addition & 0 deletions cassandra-five-zero-bridge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {
testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}")
testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}")
testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4')
testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}")

testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}")
testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public final class SummaryDbUtils
{
public static class Summary
public static class Summary implements AutoCloseable
{
private final IndexSummary indexSummary;
private final DecoratedKey firstKey;
Expand Down Expand Up @@ -70,6 +70,12 @@ public DecoratedKey last()
{
return lastKey;
}

@Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly.
public void close() throws Exception
{
indexSummary.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful with this, I remember trying this once before but it resulted in seg. faults (SIGSEGV).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ref. counted in the org.apache.cassandra.io.util.Memory class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I will double check. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fixed in 61add5b

}
}

private SummaryDbUtils()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import java.util.stream.IntStream;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import com.github.valfirst.slf4jtest.TestLogger;
import com.github.valfirst.slf4jtest.TestLoggerFactory;
import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
Expand All @@ -52,6 +55,13 @@
public class SSTableCacheTests
{
private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation();
private static final TestLogger REF_LOGGER = TestLoggerFactory.getTestLogger("org.apache.cassandra.utils.concurrent.Ref");

@AfterEach
void cleanup()
{
REF_LOGGER.clear();
}

// CHECKSTYLE IGNORE: Long method
@Test
Expand Down Expand Up @@ -191,4 +201,35 @@ public void testCache()
}
});
}

@Test
void testNoLeakDetectedError() throws Exception
{
Partitioner partitioner = Partitioner.Murmur3Partitioner;
try (TemporaryDirectory directory = new TemporaryDirectory())
{
// Write an SSTable
TestSchema schema = TestSchema.basic(BRIDGE);
schema.writeSSTable(directory, BRIDGE, partitioner,
writer -> IntStream.range(0, 10).forEach(index -> writer.write(index, 0, index)));
SSTable sstable = TestSSTable.firstIn(directory.path());
TableMetadata metadata = new SchemaBuilder(schema.createStatement,
schema.keyspace,
new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
ImmutableMap.of("replication_factor", 1)),
partitioner).tableMetaData();

SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable);
assertThat(summary).isNotNull();
assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue();
SSTableCache.INSTANCE.invalidate(sstable);
summary = null;
assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse();
// trigger GC and wait a bit before asserting LEAK DETECTED is not logged.
System.gc();
Thread.sleep(1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: We could use Awaitility, if we consider more places that need waiting for asynchronous condition. Kafka developers do not pull the dependency (even though only for test), but implemented a simple function to do the same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Uninterruptibles. I was under the impression that we removed guava from the repo. But looks like we still have them as test dependency.

System.gc();
assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty();
}
}
}
1 change: 1 addition & 0 deletions cassandra-four-zero-bridge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {
testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}")
testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}")
testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4')
testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}")

testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}")
testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -107,6 +108,28 @@ private <T> Cache<SSTable, T> buildCache(int size, int expireAfterMins)
return CacheBuilder.newBuilder()
.expireAfterAccess(expireAfterMins, TimeUnit.MINUTES)
.maximumSize(size)
.removalListener(notification -> {
// The function is to eliminate the LEAK DETECTED errors.
// How it happens:
// 1. AutoCloseable objects (e.g. IndexSummary and BloomFilter) are evicted from cache
// 2. JVM GC and the close method is not called explicitly to reduce the reference count
// 3. Reference-Reaper thread release the object and print the LEAK DETECTED error
// The function fixes it by closing the object when evicting.
Object val = notification.getValue();
if (val instanceof AutoCloseable)
{
String typeLiteral = val.getClass().getName();
try
{
LOGGER.debug("Evicting auto-closable of type: {}", typeLiteral);
((AutoCloseable) val).close();
}
catch (Exception e)
{
LOGGER.error("Exception closing cached instance of {}", typeLiteral, e);
}
}
})
Comment on lines +118 to +139
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If commenting it out, the new test fails with "LEAK DETECTED" errors

.build();
}

Expand Down Expand Up @@ -211,4 +234,14 @@ private static IOException toIOException(Throwable throwable)
IOException ioException = ThrowableUtils.rootCause(throwable, IOException.class);
return ioException != null ? ioException : new IOException(ThrowableUtils.rootCause(throwable));
}

@VisibleForTesting
void invalidate(SSTable sstable)
{
summary.invalidate(sstable);
index.invalidate(sstable);
stats.invalidate(sstable);
filter.invalidate(sstable);
compressionMetadata.invalidate(sstable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public final class SummaryDbUtils
{
public static class Summary
public static class Summary implements AutoCloseable
{
private final IndexSummary indexSummary;
private final DecoratedKey firstKey;
Expand Down Expand Up @@ -68,6 +68,12 @@ public DecoratedKey last()
{
return lastKey;
}

@Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly.
public void close() throws Exception
{
indexSummary.close();
}
}

private SummaryDbUtils()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.stream.IntStream;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import com.github.valfirst.slf4jtest.TestLogger;
import com.github.valfirst.slf4jtest.TestLoggerFactory;
import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.Descriptor;
Expand All @@ -50,6 +53,13 @@
public class SSTableCacheTests
{
private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation();
private static final TestLogger REF_LOGGER = TestLoggerFactory.getTestLogger("org.apache.cassandra.utils.concurrent.Ref");

@AfterEach
void cleanup()
{
REF_LOGGER.clear();
}

@Test
public void testCache()
Expand Down Expand Up @@ -159,4 +169,35 @@ public void testCache()
}
});
}

@Test
void testNoLeakDetectedError() throws Exception
{
Partitioner partitioner = Partitioner.Murmur3Partitioner;
try (TemporaryDirectory directory = new TemporaryDirectory())
{
// Write an SSTable
TestSchema schema = TestSchema.basic(BRIDGE);
schema.writeSSTable(directory, BRIDGE, partitioner,
writer -> IntStream.range(0, 10).forEach(index -> writer.write(index, 0, index)));
SSTable sstable = TestSSTable.firstIn(directory.path());
TableMetadata metadata = new SchemaBuilder(schema.createStatement,
schema.keyspace,
new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
ImmutableMap.of("replication_factor", 1)),
partitioner).tableMetaData();

SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable);
assertThat(summary).isNotNull();
assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue();
SSTableCache.INSTANCE.invalidate(sstable);
summary = null;
assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse();
// trigger GC and wait a bit before asserting LEAK DETECTED is not logged.
System.gc();
Thread.sleep(1000);
System.gc();
assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty();
}
}
}
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scala=2.12
spark=3
kryoVersion=4.0.2
slf4jApiVersion=1.7.30
slf4jTestVersion=2.3.0
guavaVersion=16.0.1
# force version 4.5.1 of vertx to prevent issues initializing io.vertx.core.json.jackson.JacksonCodec,
# which requires a newer version of jackson, which is not available in spark 2
Expand Down