-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-20975 Add capability to load pluggable compression service providers #4513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.cassandra.io.compress; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Set; | ||
|
|
||
| public abstract class AbstractCompressorDecorator implements ICompressor | ||
| { | ||
| protected ICompressor baseCompressor; | ||
| public AbstractCompressorDecorator(ICompressor compressor) | ||
| { | ||
| this.baseCompressor = compressor; | ||
| } | ||
|
|
||
| @Override | ||
| public int initialCompressedBufferLength(int chunkLength) | ||
| { | ||
| return baseCompressor.initialCompressedBufferLength(chunkLength); | ||
| } | ||
|
|
||
| @Override | ||
| public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException | ||
| { | ||
| return baseCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset); | ||
| } | ||
|
|
||
| @Override | ||
| public void compress(ByteBuffer input, ByteBuffer output) throws IOException | ||
| { | ||
| baseCompressor.compress(input, output); | ||
| } | ||
|
|
||
| @Override | ||
| public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException | ||
| { | ||
| baseCompressor.uncompress(input, output); | ||
| } | ||
|
|
||
| @Override | ||
| public BufferType preferredBufferType() { | ||
| return baseCompressor.preferredBufferType(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean supports(BufferType bufferType) { | ||
| return baseCompressor.supports(bufferType); | ||
| } | ||
|
|
||
| @Override | ||
| public Set<String> supportedOptions() { | ||
| return baseCompressor.supportedOptions(); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -177,7 +177,7 @@ public ICompressor compressor() | |
| result = resolvedCompressor; | ||
| if (result == null) | ||
| { | ||
| result = resolveCompressor(parameters.getSstableCompressor(), compressionDictionary); | ||
| result = resolveCompressor(parameters.getDecoratedSstableCompressor(), compressionDictionary); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
| resolvedCompressor = result; | ||
| } | ||
| return result; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.cassandra.io.compress; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| /** | ||
| * This class uses a plugin compressor to perform compress/decompress, if available, otherwise reverts to default SstableCompressor. | ||
| */ | ||
| public class CompressorDecorator extends AbstractCompressorDecorator | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think that controlling the execution flow with try/catch in below methods is a good idea. Perhaps you might decide if it is possible to use your plugin as you are getting it from the factory? This is very uncomfortable and probably also not performance friendly when we do uncompress on plugin every single time and then falling back to base compressor.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the discussion on ML it seems we will go with the fallback logic after all. In that case we need to log (in a non-spamming manner) + introduce metrics and update them on each failure.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good. I will look into adding the metrics |
||
| { | ||
| private ICompressor pluginCompressor; | ||
| public CompressorDecorator(ICompressor baseCompressor, ICompressor pluginCompressor) | ||
| { | ||
| super(baseCompressor); | ||
| this.pluginCompressor = pluginCompressor; | ||
| } | ||
|
|
||
| @Override | ||
| public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException | ||
| { | ||
| try | ||
| { | ||
| return pluginCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset); | ||
| } | ||
| catch(IOException e) | ||
| { | ||
| return super.uncompress(input, inputOffset, inputLength, output, outputOffset); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void compress(ByteBuffer input, ByteBuffer output) throws IOException | ||
| { | ||
| try | ||
| { | ||
| pluginCompressor.compress(input, output); | ||
| } | ||
| catch(IOException e) | ||
| { | ||
| super.compress(input, output); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException | ||
| { | ||
| try | ||
| { | ||
| pluginCompressor.uncompress(input, output); | ||
| } | ||
| catch(IOException e) | ||
| { | ||
| super.uncompress(input, output); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.cassandra.io.compress; | ||
|
|
||
| import java.util.Map; | ||
| import com.google.common.collect.ImmutableMap; | ||
|
|
||
| public interface ICompressorFactory | ||
| { | ||
| public static final ImmutableMap<String, String> COMPRESSOR_NAME_MAP = ImmutableMap.of( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is unused and can be removed, also not correct as such, we would need to be way more robust than this in general.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We would need to use some reflection to scan classes in I am also not completely sure if all such compression algorithms support your hardware speedup. In that case we would need to some filter them more. Not sure, waiting on your feedback.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intention behind using the map was to have a simple name to represent the algorithm like, |
||
| "deflate", "DeflateCompressor", | ||
| "lz4", "LZ4Compressor", | ||
| "snappy", "SnappyCompressor", | ||
| "zstd", "ZstdCompressor" | ||
| ); | ||
|
|
||
| /** | ||
| * Used to create a plugin compressor | ||
| */ | ||
| public ICompressor createCompressor(Map<String, String> options) throws IllegalStateException; | ||
|
|
||
| /** | ||
| * Services can use COMPRESSOR_NAME_MAP to associate with an existing compressor | ||
| */ | ||
| public String getSupportedCompressorName(); | ||
shyla226 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ | |
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.ServiceLoader; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Objects; | ||
|
|
@@ -40,10 +42,14 @@ | |
| import org.apache.cassandra.io.util.DataOutputPlus; | ||
| import org.apache.cassandra.net.MessagingService; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static java.lang.String.format; | ||
|
|
||
| public final class CompressionParams | ||
| { | ||
| private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you log in the implementation as suggested then this might go away (with imports too) |
||
| public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; | ||
| public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; // Since pre-4.0 versions do not understand the | ||
| // new compression parameter we can't use a | ||
|
|
@@ -71,6 +77,7 @@ public final class CompressionParams | |
| Collections.emptyMap()); | ||
|
|
||
| private final ICompressor sstableCompressor; | ||
| private final ICompressor decoratedSstableCompressor; | ||
| private final int chunkLength; | ||
| private final int maxCompressedLength; // In content we store max length to avoid rounding errors causing compress/decompress mismatch. | ||
| private final double minCompressRatio; // In configuration we store min ratio, the input parameter. | ||
|
|
@@ -223,6 +230,7 @@ private CompressionParams(ICompressor sstableCompressor, int chunkLength, int ma | |
| this.otherOptions = ImmutableMap.copyOf(otherOptions); | ||
| this.minCompressRatio = minCompressRatio; | ||
| this.maxCompressedLength = maxCompressedLength; | ||
| this.decoratedSstableCompressor = decorateCompressor(sstableCompressor, otherOptions); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alignement
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we actually need Then sstableCompressor would be either decorated or not.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah..I will try this. I think you answered one of my questions |
||
| } | ||
|
|
||
| public CompressionParams copy() | ||
|
|
@@ -260,6 +268,15 @@ public ICompressor getSstableCompressor() | |
| return sstableCompressor; | ||
| } | ||
|
|
||
| /** | ||
| * Provides a decorated SSTable compressor, if a compression service is loaded. | ||
| * @return a decorated SSTable compressor or {@code getSstableCompressor()}. | ||
| */ | ||
| public ICompressor getDecoratedSstableCompressor() | ||
| { | ||
| return decoratedSstableCompressor; | ||
| } | ||
|
|
||
| public ImmutableMap<String, String> getOtherOptions() | ||
| { | ||
| return otherOptions; | ||
|
|
@@ -348,6 +365,48 @@ public static ICompressor createCompressor(ParameterizedClass compression) throw | |
| return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a decorated compressor, if compression service providers available in the classpath or | ||
| * returns the base compressor | ||
| * @param baseCompressor compressor being decorated | ||
| * @param options compression options of baseCompressor | ||
| * @return returns a decorated compressor, if service available, otherwise baseCompressor | ||
| */ | ||
| private static ICompressor decorateCompressor(ICompressor baseCompressor, Map<String, String> options) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this might be way simplified We can get rid of getServiceProviderFactory completely. Just return |
||
| { | ||
| if(baseCompressor != null) | ||
| { | ||
| try | ||
| { | ||
| Optional<ICompressorFactory> selectedFactory = getServiceProviderFactory(baseCompressor.getClass().getSimpleName()); | ||
| if (selectedFactory.isPresent()) | ||
| { | ||
| ICompressor pluginCompressor = selectedFactory.get().createCompressor(options); | ||
| return new CompressorDecorator(baseCompressor, pluginCompressor); | ||
| } | ||
| } | ||
| catch(IllegalStateException e) | ||
| { | ||
| logger.trace("Failed to access service provider. Will fallback to default!!"); | ||
| } | ||
| } | ||
| return baseCompressor; | ||
| } | ||
|
|
||
| /** | ||
| * Provides access to a factory to create plugin compressors, if available | ||
| * @param compressorName simple name of the compressor class | ||
| * @return an optional containing ICompressorFactory, if present, an empty optional otherwise | ||
| */ | ||
| private static Optional<ICompressorFactory> getServiceProviderFactory(String compressorName) | ||
| { | ||
| ServiceLoader<ICompressorFactory> loader = ServiceLoader.load(ICompressorFactory.class); | ||
| return loader.stream() | ||
| .filter(factory -> factory.get().getSupportedCompressorName().equals(compressorName)) | ||
| .map(ServiceLoader.Provider::get) | ||
| .findFirst(); | ||
| } | ||
|
|
||
| private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co) | ||
| { | ||
| if (co == null || co.isEmpty()) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alignement, plus we should most probably not use it like this. We should still use
getSstableCompressor()as it was before. It is implementation detail if the compressor is decorated or not.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Reasoning for doing it this way,
sstableCompressorbeing final it can be changed only in thecreateCompressorfunction. But if I change it there,CompressionMetadataalso gets updated and stored.Any thoughts how I could overcome this issue?