Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 13, 2025

📄 109% (1.09x) speedup for EmbedMaxDct.infer_dct_svd in invokeai/backend/image_util/imwatermark/vendor.py

⏱️ Runtime : 17.7 milliseconds 8.43 milliseconds (best of 46 runs)

📝 Explanation and details

The optimized code achieves a 109% speedup by making a single but highly effective change to the SVD computation in the infer_dct_svd method.

Key optimization: The original code computes the full SVD decomposition with u, s, v = np.linalg.svd(cv2.dct(block)), but only uses the singular values s. The optimized version uses s = np.linalg.svd(dct_block, compute_uv=False) to compute only the singular values, skipping the expensive computation of the U and V matrices.

Why this leads to speedup: SVD is computationally expensive, with the full decomposition requiring O(n³) operations. When compute_uv=False, numpy skips computing the orthogonal matrices U and V, significantly reducing both computation time and memory allocation. The line profiler shows the SVD operation time dropped from 25.1ms (90.6% of total time) to 11.5ms (79% of total time).

Performance impact: The optimization is particularly effective for larger blocks, as evidenced by the test results:

  • Small blocks (4x4): ~20% faster
  • Medium blocks (8x8): ~40-50% faster
  • Large blocks (32x32, 64x64): ~90-190% faster

The speedup scales with block size because the computational savings of skipping U and V matrix computation become more pronounced as the matrix dimensions increase. This makes the optimization especially valuable for image processing workloads that process larger DCT blocks or perform batch processing of multiple blocks.

The optimization preserves all original behavior and return values while eliminating unnecessary computation, making it a pure performance win with no functional trade-offs.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 627 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import cv2
import numpy as np
# imports
import pytest  # used for our unit tests
from invokeai.backend.image_util.imwatermark.vendor import EmbedMaxDct

# unit tests

# ---- Basic Test Cases ----

def test_basic_binary_outcome_0():
    """
    Test that infer_dct_svd returns 0 for a block where the largest singular value modulo scale is below threshold.
    """
    embedder = EmbedMaxDct()
    # Create a block of zeros; DCT and SVD will yield singular values of zero.
    block = np.zeros((4, 4), dtype=np.float32)
    scale = 10
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 72.8μs -> 62.8μs (16.0% faster)

def test_basic_binary_outcome_1():
    """
    Test that infer_dct_svd returns 1 for a block where the largest singular value modulo scale is above threshold.
    """
    embedder = EmbedMaxDct()
    # Create a block with a single high value so DCT will have a nonzero s[0]
    block = np.ones((4, 4), dtype=np.float32) * 20
    scale = 10
    # DCT of a constant block: only [0,0] is nonzero, SVD will have s[0] = 80
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 46.8μs -> 39.8μs (17.5% faster)
    # 80 % 10 = 0, so result should be 0. Let's use a block that will give us a nonzero modulo.
    block = np.ones((4, 4), dtype=np.float32) * 25
    # DCT[0,0] = 100, so s[0] = 100, 100 % 10 = 0, still 0.
    # Let's try a block that is not uniform.
    block = np.eye(4, dtype=np.float32) * 7
    # DCT will be less predictable, but let's try with a scale that will likely result in a nonzero modulo.
    scale = 5
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 25.6μs -> 22.4μs (14.3% faster)

def test_basic_typical_block():
    """
    Test with a typical non-uniform block and a common scale.
    """
    embedder = EmbedMaxDct()
    block = np.array([
        [10, 20, 30, 40],
        [50, 60, 70, 80],
        [90, 100, 110, 120],
        [130, 140, 150, 160]
    ], dtype=np.float32)
    scale = 36
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 45.3μs -> 37.8μs (19.8% faster)

# ---- Edge Test Cases ----

def test_edge_scale_zero():
    """
    Test with scale = 0, which should raise ZeroDivisionError or produce an error.
    """
    embedder = EmbedMaxDct()
    block = np.ones((4, 4), dtype=np.float32)
    scale = 0
    with pytest.raises(ZeroDivisionError):
        embedder.infer_dct_svd(block, scale)

def test_edge_block_size_1x1():
    """
    Test with a 1x1 block, the smallest possible block.
    """
    embedder = EmbedMaxDct()
    block = np.array([[42]], dtype=np.float32)
    scale = 10
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 65.6μs -> 54.8μs (19.7% faster)

def test_edge_negative_scale():
    """
    Test with a negative scale value.
    """
    embedder = EmbedMaxDct()
    block = np.ones((4, 4), dtype=np.float32)
    scale = -10
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 50.6μs -> 37.5μs (34.7% faster)

def test_edge_negative_block_values():
    """
    Test with negative values in the block.
    """
    embedder = EmbedMaxDct()
    block = -np.ones((4, 4), dtype=np.float32) * 15
    scale = 7
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 43.9μs -> 33.5μs (30.9% faster)

def test_edge_non_square_block():
    """
    Test with a non-square block (should raise an error from cv2.dct).
    """
    embedder = EmbedMaxDct()
    block = np.ones((4, 3), dtype=np.float32)
    scale = 10
    with pytest.raises(cv2.error):
        embedder.infer_dct_svd(block, scale)

def test_edge_non_float_block():
    """
    Test with a block of integer type (should be converted by cv2.dct).
    """
    embedder = EmbedMaxDct()
    block = np.ones((4, 4), dtype=np.int32) * 5
    scale = 3
    # cv2.dct expects float32 or float64, but will cast if needed
    codeflash_output = embedder.infer_dct_svd(block.astype(np.float32), scale); result = codeflash_output # 54.5μs -> 47.8μs (13.9% faster)

def test_edge_large_singular_value():
    """
    Test with a block that will produce a very large singular value.
    """
    embedder = EmbedMaxDct()
    block = np.ones((4, 4), dtype=np.float32) * 1e6
    scale = 999999
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 42.0μs -> 37.3μs (12.6% faster)

# ---- Large Scale Test Cases ----

def test_large_block_32x32():
    """
    Test with a large 32x32 block.
    """
    embedder = EmbedMaxDct()
    block = np.random.rand(32, 32).astype(np.float32) * 255
    scale = 50
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 196μs -> 102μs (90.9% faster)

def test_large_block_64x64():
    """
    Test with a 64x64 block, near upper limit for DCT/SVD in reasonable time.
    """
    embedder = EmbedMaxDct()
    block = np.random.rand(64, 64).astype(np.float32) * 100
    scale = 77
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 716μs -> 248μs (189% faster)

def test_many_blocks_performance():
    """
    Test running infer_dct_svd on 500 different random blocks for performance and determinism.
    """
    embedder = EmbedMaxDct()
    scale = 25
    for i in range(500):
        block = np.random.randint(0, 255, (8, 8)).astype(np.float32)
        codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 13.2ms -> 5.88ms (125% faster)

def test_large_scale_value():
    """
    Test with a very large scale value.
    """
    embedder = EmbedMaxDct()
    block = np.random.rand(4, 4).astype(np.float32) * 1000
    scale = 10**8
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 57.7μs -> 47.1μs (22.4% faster)

def test_large_block_and_scale():
    """
    Test with both a large block and a large scale value.
    """
    embedder = EmbedMaxDct()
    block = np.random.rand(32, 32).astype(np.float32) * 1e6
    scale = 10**6
    codeflash_output = embedder.infer_dct_svd(block, scale); result = codeflash_output # 244μs -> 97.1μs (152% faster)

# ---- Determinism Test ----

def test_determinism():
    """
    Test that the function is deterministic for the same input.
    """
    embedder = EmbedMaxDct()
    block = np.random.rand(4, 4).astype(np.float32) * 100
    scale = 13
    codeflash_output = embedder.infer_dct_svd(block, scale); result1 = codeflash_output # 49.9μs -> 35.3μs (41.6% faster)
    codeflash_output = embedder.infer_dct_svd(block, scale); result2 = codeflash_output # 25.1μs -> 14.5μs (72.8% faster)

# ---- Input Validation Test ----

def test_invalid_block_type():
    """
    Test that passing a non-numpy array raises an error.
    """
    embedder = EmbedMaxDct()
    block = [[1, 2], [3, 4]]
    scale = 5
    with pytest.raises(TypeError):
        embedder.infer_dct_svd(block, scale)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import cv2
import numpy as np
# imports
import pytest  # used for our unit tests
from invokeai.backend.image_util.imwatermark.vendor import EmbedMaxDct


# unit tests
class TestEmbedMaxDctInferDctSvd:
    # ----- Basic Test Cases -----

    def test_simple_block_scale_1(self):
        # Test with a simple 2x2 block and scale=1
        block = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32)
        scale = 1
        emb = EmbedMaxDct()
        # s[0] is always >=0; s[0] % 1 is always in [0,1)
        # So (s[0] % 1) > 0.5 may be True or False depending on s[0]
        # Let's compute it:
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 22.1μs -> 18.1μs (22.5% faster)

    def test_simple_block_scale_5(self):
        # Test with a simple 2x2 block and scale=5
        block = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32)
        scale = 5
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 18.8μs -> 15.5μs (21.0% faster)

    def test_block_with_zeros(self):
        # Test with a block of all zeros
        block = np.zeros((4, 4), dtype=np.float32)
        scale = 10
        emb = EmbedMaxDct()
        # DCT of zeros is zeros; SVD of zeros: s[0] == 0
        expected = int((0 % scale) > scale * 0.5)  # always 0
        codeflash_output = emb.infer_dct_svd(block, scale) # 40.6μs -> 29.2μs (39.0% faster)

    def test_block_with_negative_values(self):
        # Test with negative values in the block
        block = np.array([[-1, -2], [-3, -4]], dtype=np.float32)
        scale = 2
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 17.5μs -> 14.9μs (17.7% faster)

    def test_block_with_large_values(self):
        # Test with large values in the block
        block = np.full((4, 4), 1e6, dtype=np.float32)
        scale = 100
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 19.8μs -> 15.3μs (29.7% faster)

    # ----- Edge Test Cases -----

    def test_scale_is_zero(self):
        # Test with scale=0 (should raise ZeroDivisionError)
        block = np.eye(4, dtype=np.float32)
        scale = 0
        emb = EmbedMaxDct()
        with pytest.raises(ZeroDivisionError):
            emb.infer_dct_svd(block, scale)

    def test_block_is_1x1(self):
        # Test with a 1x1 block
        block = np.array([[42.0]], dtype=np.float32)
        scale = 5
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 19.2μs -> 16.2μs (18.9% faster)

    def test_block_is_non_square(self):
        # Test with a non-square block (2x3)
        block = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float32)
        scale = 3
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 20.0μs -> 16.7μs (19.8% faster)

    def test_block_with_nan(self):
        # Test with NaN values in the block
        block = np.array([[np.nan, 2], [3, 4]], dtype=np.float32)
        scale = 2
        emb = EmbedMaxDct()
        # SVD or DCT with nan will result in nan in s[0], so (nan % scale) is nan, and comparison is always False
        codeflash_output = emb.infer_dct_svd(block, scale); result = codeflash_output

    def test_block_with_inf(self):
        # Test with inf in the block
        block = np.array([[np.inf, 2], [3, 4]], dtype=np.float32)
        scale = 3
        emb = EmbedMaxDct()
        # DCT or SVD with inf will result in inf in s[0], (inf % scale) is nan, so comparison is False
        codeflash_output = emb.infer_dct_svd(block, scale); result = codeflash_output # 66.1μs -> 56.7μs (16.6% faster)

    def test_block_with_mixed_inf_nan(self):
        # Test with both inf and nan
        block = np.array([[np.nan, np.inf], [3, 4]], dtype=np.float32)
        scale = 4
        emb = EmbedMaxDct()
        codeflash_output = emb.infer_dct_svd(block, scale); result = codeflash_output

    def test_block_with_dtype_int(self):
        # Test with integer dtype
        block = np.array([[1, 2], [3, 4]], dtype=np.int32)
        scale = 2
        emb = EmbedMaxDct()
        # cv2.dct expects float32 or float64, so this should raise an error
        with pytest.raises(cv2.error):
            emb.infer_dct_svd(block, scale) # 41.8μs -> 39.3μs (6.27% faster)

    def test_block_with_dtype_float64(self):
        # Test with float64 dtype
        block = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float64)
        scale = 2
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 15.2μs -> 14.3μs (6.89% faster)

    def test_scale_is_float(self):
        # Test with a float scale
        block = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32)
        scale = 2.5
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 16.8μs -> 15.5μs (8.54% faster)

    # ----- Large Scale Test Cases -----

    def test_large_block(self):
        # Test with a large 32x32 block
        block = np.random.rand(32, 32).astype(np.float32)
        scale = 10
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 144μs -> 74.6μs (94.1% faster)

    def test_large_block_high_scale(self):
        # Test with a large block and high scale
        block = np.random.rand(64, 64).astype(np.float32)
        scale = 999
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 768μs -> 298μs (157% faster)

    def test_many_blocks(self):
        # Test many blocks in a loop to simulate batch processing
        emb = EmbedMaxDct()
        scale = 7
        for i in range(50):  # keep under 1000 for performance
            block = np.random.rand(8, 8).astype(np.float32) * i
            s = np.linalg.svd(cv2.dct(block))[1]
            expected = int((s[0] % scale) > scale * 0.5)
            codeflash_output = emb.infer_dct_svd(block, scale) # 1.31ms -> 878μs (48.7% faster)

    def test_large_scale_float(self):
        # Test with a large float scale
        block = np.random.rand(16, 16).astype(np.float32)
        scale = 123.456
        emb = EmbedMaxDct()
        s = np.linalg.svd(cv2.dct(block))[1]
        expected = int((s[0] % scale) > scale * 0.5)
        codeflash_output = emb.infer_dct_svd(block, scale) # 66.6μs -> 41.9μs (58.9% faster)

    # ----- Determinism Test -----

    def test_deterministic_output(self):
        # The function should be deterministic for the same input
        block = np.random.rand(8, 8).astype(np.float32)
        scale = 4
        emb = EmbedMaxDct()
        codeflash_output = emb.infer_dct_svd(block, scale); result1 = codeflash_output # 52.3μs -> 40.6μs (28.7% faster)
        codeflash_output = emb.infer_dct_svd(block, scale); result2 = codeflash_output # 31.9μs -> 21.8μs (46.0% faster)

    # ----- Invalid Input Test -----

    def test_non_array_block(self):
        # Test with a non-numpy-array block (e.g., list of lists)
        block = [[1, 2], [3, 4]]
        scale = 2
        emb = EmbedMaxDct()
        # cv2.dct expects numpy array, so this should raise an error
        with pytest.raises(cv2.error):
            emb.infer_dct_svd(block, scale) # 22.7μs -> 23.6μs (3.75% slower)

    def test_empty_block(self):
        # Test with an empty block
        block = np.array([], dtype=np.float32).reshape(0, 0)
        scale = 2
        emb = EmbedMaxDct()
        # cv2.dct of empty array raises an error
        with pytest.raises(cv2.error):
            emb.infer_dct_svd(block, scale)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-EmbedMaxDct.infer_dct_svd-mhwyrdsb and push.

Codeflash Static Badge

The optimized code achieves a **109% speedup** by making a single but highly effective change to the SVD computation in the `infer_dct_svd` method.

**Key optimization**: The original code computes the full SVD decomposition with `u, s, v = np.linalg.svd(cv2.dct(block))`, but only uses the singular values `s`. The optimized version uses `s = np.linalg.svd(dct_block, compute_uv=False)` to compute only the singular values, skipping the expensive computation of the U and V matrices.

**Why this leads to speedup**: SVD is computationally expensive, with the full decomposition requiring O(n³) operations. When `compute_uv=False`, numpy skips computing the orthogonal matrices U and V, significantly reducing both computation time and memory allocation. The line profiler shows the SVD operation time dropped from 25.1ms (90.6% of total time) to 11.5ms (79% of total time).

**Performance impact**: The optimization is particularly effective for larger blocks, as evidenced by the test results:
- Small blocks (4x4): ~20% faster
- Medium blocks (8x8): ~40-50% faster  
- Large blocks (32x32, 64x64): ~90-190% faster

The speedup scales with block size because the computational savings of skipping U and V matrix computation become more pronounced as the matrix dimensions increase. This makes the optimization especially valuable for image processing workloads that process larger DCT blocks or perform batch processing of multiple blocks.

The optimization preserves all original behavior and return values while eliminating unnecessary computation, making it a pure performance win with no functional trade-offs.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 13, 2025 05:03
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant