Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.compress;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;

public abstract class AbstractCompressorDecorator implements ICompressor
{
protected ICompressor baseCompressor;
public AbstractCompressorDecorator(ICompressor compressor)
{
this.baseCompressor = compressor;
}

@Override
public int initialCompressedBufferLength(int chunkLength)
{
return baseCompressor.initialCompressedBufferLength(chunkLength);
}

@Override
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
{
return baseCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
}

@Override
public void compress(ByteBuffer input, ByteBuffer output) throws IOException
{
baseCompressor.compress(input, output);
}

@Override
public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
baseCompressor.uncompress(input, output);
}

@Override
public BufferType preferredBufferType() {
return baseCompressor.preferredBufferType();
}

@Override
public boolean supports(BufferType bufferType) {
return baseCompressor.supports(bufferType);
}

@Override
public Set<String> supportedOptions() {
return baseCompressor.supportedOptions();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public CompressedSequentialWriter(File file,
.bufferType(parameters.getSstableCompressor().preferredBufferType())
.finishOnClose(option.finishOnClose())
.build());
ICompressor compressor = parameters.getSstableCompressor();
Copy link
Contributor

@smiklosovic smiklosovic Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignement, plus we should most probably not use it like this. We should still use getSstableCompressor() as it was before. It is implementation detail if the compressor is decorated or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Reasoning for doing it this way, sstableCompressor being final it can be changed only in the createCompressor function. But if I change it there, CompressionMetadata also gets updated and stored.
Any thoughts how I could overcome this issue?

ICompressor compressor = parameters.getDecoratedSstableCompressor();
this.digestFile = Optional.ofNullable(digestFile);

// buffer for compression should be the same size as buffer itself
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public ICompressor compressor()
result = resolvedCompressor;
if (result == null)
{
result = resolveCompressor(parameters.getSstableCompressor(), compressionDictionary);
result = resolveCompressor(parameters.getDecoratedSstableCompressor(), compressionDictionary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

resolvedCompressor = result;
}
return result;
Expand Down
75 changes: 75 additions & 0 deletions src/java/org/apache/cassandra/io/compress/CompressorDecorator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.compress;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* This class uses a plugin compressor to perform compress/decompress, if available, otherwise reverts to default SstableCompressor.
*/
public class CompressorDecorator extends AbstractCompressorDecorator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that controlling the execution flow with try/catch in below methods is a good idea. Perhaps you might decide if it is possible to use your plugin as you are getting it from the factory? This is very uncomfortable and probably also not performance friendly when we do uncompress on plugin every single time and then falling back to base compressor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the discussion on ML it seems we will go with the fallback logic after all. In that case we need to log (in a non-spamming manner) + introduce metrics and update them on each failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. I will look into adding the metrics

{
private ICompressor pluginCompressor;
public CompressorDecorator(ICompressor baseCompressor, ICompressor pluginCompressor)
{
super(baseCompressor);
this.pluginCompressor = pluginCompressor;
}

@Override
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
{
try
{
return pluginCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
}
catch(IOException e)
{
return super.uncompress(input, inputOffset, inputLength, output, outputOffset);
}
}

@Override
public void compress(ByteBuffer input, ByteBuffer output) throws IOException
{
try
{
pluginCompressor.compress(input, output);
}
catch(IOException e)
{
super.compress(input, output);
}
}

@Override
public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
try
{
pluginCompressor.uncompress(input, output);
}
catch(IOException e)
{
super.uncompress(input, output);
}
}

}
42 changes: 42 additions & 0 deletions src/java/org/apache/cassandra/io/compress/ICompressorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.compress;

import java.util.Map;
import com.google.common.collect.ImmutableMap;

public interface ICompressorFactory
{
public static final ImmutableMap<String, String> COMPRESSOR_NAME_MAP = ImmutableMap.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unused and can be removed, also not correct as such, we would need to be way more robust than this in general.

Copy link
Contributor

@smiklosovic smiklosovic Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static final redundant too. Would you mind to shed some light at the logic behind this?

We would need to use some reflection to scan classes in org.apache.cassandra.io.compress package which are not abstract and implement ICompressor otherwise we would need to update this every time we add a new compressor implementation. (you are already missing ZstdDictionaryCompressor here)

I am also not completely sure if all such compression algorithms support your hardware speedup. In that case we would need to some filter them more. Not sure, waiting on your feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention behind using the map was to have a simple name to represent the algorithm like, deflate, lz4 etc. and map them to concrete classes in Cassandra. Another way I can think of is make the plugin factory return concrete class names like DeflateCompressor when getSupportedCompressorName is invoked, to specify which compressor it is accelerating. Will that be alright?

"deflate", "DeflateCompressor",
"lz4", "LZ4Compressor",
"snappy", "SnappyCompressor",
"zstd", "ZstdCompressor"
);

/**
* Used to create a plugin compressor
*/
public ICompressor createCompressor(Map<String, String> options) throws IllegalStateException;

/**
* Services can use COMPRESSOR_NAME_MAP to associate with an existing compressor
*/
public String getSupportedCompressorName();

}
59 changes: 59 additions & 0 deletions src/java/org/apache/cassandra/schema/CompressionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
Expand All @@ -40,10 +42,14 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.String.format;

public final class CompressionParams
{
private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you log in the implementation as suggested then this might go away (with imports too)

public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; // Since pre-4.0 versions do not understand the
// new compression parameter we can't use a
Expand Down Expand Up @@ -71,6 +77,7 @@ public final class CompressionParams
Collections.emptyMap());

private final ICompressor sstableCompressor;
private final ICompressor decoratedSstableCompressor;
private final int chunkLength;
private final int maxCompressedLength; // In content we store max length to avoid rounding errors causing compress/decompress mismatch.
private final double minCompressRatio; // In configuration we store min ratio, the input parameter.
Expand Down Expand Up @@ -223,6 +230,7 @@ private CompressionParams(ICompressor sstableCompressor, int chunkLength, int ma
this.otherOptions = ImmutableMap.copyOf(otherOptions);
this.minCompressRatio = minCompressRatio;
this.maxCompressedLength = maxCompressedLength;
this.decoratedSstableCompressor = decorateCompressor(sstableCompressor, otherOptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignement

Copy link
Contributor

@smiklosovic smiklosovic Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need decoratedSstableCompressor? I propose to do something like

this.sstableCompressor = maybeDecorateCompressor(sstableCompressor, otherOptions);

Then sstableCompressor would be either decorated or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah..I will try this. I think you answered one of my questions

}

public CompressionParams copy()
Expand Down Expand Up @@ -260,6 +268,15 @@ public ICompressor getSstableCompressor()
return sstableCompressor;
}

/**
* Provides a decorated SSTable compressor, if a compression service is loaded.
* @return a decorated SSTable compressor or {@code getSstableCompressor()}.
*/
public ICompressor getDecoratedSstableCompressor()
{
return decoratedSstableCompressor;
}

public ImmutableMap<String, String> getOtherOptions()
{
return otherOptions;
Expand Down Expand Up @@ -348,6 +365,48 @@ public static ICompressor createCompressor(ParameterizedClass compression) throw
return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters));
}

/**
* Creates a decorated compressor, if compression service providers available in the classpath or
* returns the base compressor
* @param baseCompressor compressor being decorated
* @param options compression options of baseCompressor
* @return returns a decorated compressor, if service available, otherwise baseCompressor
*/
private static ICompressor decorateCompressor(ICompressor baseCompressor, Map<String, String> options)
Copy link
Contributor

@smiklosovic smiklosovic Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be way simplified

    private static ICompressor decorateCompressor(ICompressor baseCompressor, Map<String, String> options)
    {
        return Optional.ofNullable(baseCompressor)
                       .flatMap(c -> ServiceLoader.load(ICompressorFactory.class)
                                                  .stream()
                                                  .map(ServiceLoader.Provider::get)
                                                  .filter(factory-> c.getClass().getSimpleName()
                                                                              .equals(factory.getSupportedCompressorName()))
                                                  .findFirst())
                       .flatMap(factory -> factory.createCompressor(options))
                       .map(pluginCompressor -> new CompressorDecorator(baseCompressor, pluginCompressor))
                       .orElse(null);
    }

We can get rid of getServiceProviderFactory completely. Just return Optional<ICompressor> from factory's createCompressor. You are just catching that exception here and logging, you can do same in the implementation and return empty optional instead if not possible to instantiate.

{
if(baseCompressor != null)
{
try
{
Optional<ICompressorFactory> selectedFactory = getServiceProviderFactory(baseCompressor.getClass().getSimpleName());
if (selectedFactory.isPresent())
{
ICompressor pluginCompressor = selectedFactory.get().createCompressor(options);
return new CompressorDecorator(baseCompressor, pluginCompressor);
}
}
catch(IllegalStateException e)
{
logger.trace("Failed to access service provider. Will fallback to default!!");
}
}
return baseCompressor;
}

/**
* Provides access to a factory to create plugin compressors, if available
* @param compressorName simple name of the compressor class
* @return an optional containing ICompressorFactory, if present, an empty optional otherwise
*/
private static Optional<ICompressorFactory> getServiceProviderFactory(String compressorName)
{
ServiceLoader<ICompressorFactory> loader = ServiceLoader.load(ICompressorFactory.class);
return loader.stream()
.filter(factory -> factory.get().getSupportedCompressorName().equals(compressorName))
.map(ServiceLoader.Provider::get)
.findFirst();
}

private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co)
{
if (co == null || co.isEmpty())
Expand Down