Skip to content

providers/ldap: add SASL EXTERNAL support#21212

Open
BeryJu wants to merge 6 commits intomainfrom
providers/ldap/sasl-tls
Open

providers/ldap: add SASL EXTERNAL support#21212
BeryJu wants to merge 6 commits intomainfrom
providers/ldap/sasl-tls

Conversation

@BeryJu
Copy link
Copy Markdown
Member

@BeryJu BeryJu commented Mar 28, 2026

Allow authenticating to LDAP via certificate; uses MTLS stage and require enterprise as such

Based on BeryJu/ldap#5

@BeryJu BeryJu requested a review from a team as a code owner March 28, 2026 13:30
@netlify
Copy link
Copy Markdown

netlify bot commented Mar 28, 2026

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit 2005651
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/69d4fd6d6c0ea50008f142a7
😎 Deploy Preview https://deploy-preview-21212--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 28, 2026

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
3167 1 3166 1
View the top 2 failed test(s) by shortest run time
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_logout_search
Stack Traces | 15.9s run time
self = <unittest.case._Outcome object at 0x7f2aed35f950>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>
result = <TestCaseFunction test_ldap_bind_logout_search>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>
method = <bound method TestProviderLDAP.test_ldap_bind_logout_search of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/decorators.py:60: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>,)
kwargs = {}, config = <AuthentikTenantsConfig: authentik_tenants>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>,)
kwargs = {}, config = <AuthentikOutpostConfig: authentik_outposts>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_logout_search>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @reconcile_app("authentik_tenants")
    @reconcile_app("authentik_outposts")
    def test_ldap_bind_logout_search(self):
        """Test bind + session deletion -> failed search"""
        self._prepare()
        server = Server("ldap://localhost:3389", get_info=ALL)
        _connection = Connection(
            server,
            raise_exceptions=True,
            user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
            password=self.user.username,
        )
        _connection.bind()
        self.assertTrue(
            Event.objects.filter(
                action=EventAction.LOGIN,
                user={
                    "pk": self.user.pk,
                    "email": self.user.email,
                    "username": self.user.username,
                },
            )
        )
        c, _ = AuthenticatedSession.objects.filter(user_id=self.user.pk).delete()
        self.assertGreaterEqual(c, 1)
        # Give the sign out signal time to propagate
        sleep(3)
    
>       with self.assertRaises(LDAPSessionTerminatedByServerError):
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/e2e/test_provider_ldap.py:624: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <unittest.case._AssertRaisesContext object at 0x7f2ae763fc50>
exc_type = None, exc_value = None, tb = None

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type is None:
            try:
                exc_name = self.expected.__name__
            except AttributeError:
                exc_name = str(self.expected)
            if self.obj_name:
                self._raiseFailure("{} not raised by {}".format(exc_name,
                                                                self.obj_name))
            else:
>               self._raiseFailure("{} not raised".format(exc_name))

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:272: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <unittest.case._AssertRaisesContext object at 0x7f2ae763fc50>
standardMsg = 'LDAPSessionTerminatedByServerError not raised'

    def _raiseFailure(self, standardMsg):
        msg = self.test_case._formatMessage(self.msg, standardMsg)
>       raise self.test_case.failureException(msg)
E       AssertionError: LDAPSessionTerminatedByServerError not raised

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:209: AssertionError
tests.e2e.test_provider_ldap.TestProviderLDAP::test_ldap_bind_success_starttls_sasl
Stack Traces | 20.9s run time
self = <unittest.case._Outcome object at 0x7eff0d55f950>
test_case = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>
result = <TestCaseFunction test_ldap_bind_success_starttls_sasl>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>
method = <bound method TestProviderLDAP.test_ldap_bind_success_starttls_sasl of <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/decorators.py:60: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_ldap.TestProviderLDAP testMethod=test_ldap_bind_success_starttls_sasl>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_ldap_bind_success_starttls_sasl(self):
        """Test SASL bind with ssl"""
        # Create flow with MTLS Stage
        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
        mtls_stage = MutualTLSStage.objects.create(
            name=generate_id(),
            mode=StageMode.REQUIRED,
        )
        mtls_stage.certificate_authorities.add(self.kp)
        login_stage = UserLoginStage.objects.create(
            name=generate_id(),
        )
        FlowStageBinding.objects.create(target=flow, stage=mtls_stage, order=0)
        FlowStageBinding.objects.create(target=flow, stage=login_stage, order=1)
    
        self._prepare(authorization_flow=flow, certificate=self.kp)
    
        tls = Tls(
            local_private_key_file=self.key,
            local_certificate_file=self.cert,
        )
        server = Server("ldap://localhost:3389", tls=tls)
        _connection = Connection(
            server,
            version=3,
            raise_exceptions=True,
            authentication=SASL,
            sasl_mechanism=EXTERNAL,
            sasl_credentials=f"dn:cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
        )
        _connection.start_tls()
        with (
            patch(
                "authentik.enterprise.stages.mtls.stage.MTLSStageView.validate_cert",
                MagicMock(return_value=self.kp.certificate),
            ),
            patch(
                "authentik.enterprise.stages.mtls.stage.MTLSStageView.check_if_user",
                MagicMock(return_value=self.user),
            ) as check_if_user,
        ):
>           _connection.bind()

tests/e2e/test_provider_ldap.py:238: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], tls=Tls(loc...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
read_server_info = True, controls = None

    def bind(self,
             read_server_info=True,
             controls=None):
        """Bind to ldap Server with the authentication method and the user defined in the connection
    
        :param read_server_info: reads info from server
        :param controls: LDAP controls to send along with the bind operation
        :type controls: list of tuple
        :return: bool
    
        """
        if log_enabled(BASIC):
            log(BASIC, 'start BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            if self.lazy and not self._executing_deferred:
                if self.strategy.pooled:
                    self.strategy.validate_bind(controls)
                self._deferred_bind = True
                self._bind_controls = controls
                self.bound = True
                if log_enabled(BASIC):
                    log(BASIC, 'deferring bind for <%s>', self)
            else:
                self._deferred_bind = False
                self._bind_controls = None
                if self.closed:  # try to open connection if closed
                    self.open(read_server_info=False)
                if self.authentication == ANONYMOUS:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing anonymous BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, '', auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'anonymous BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SIMPLE:
                    if log_enabled(PROTOCOL):
                        log(PROTOCOL, 'performing simple BIND for <%s>', self)
                    if not self.strategy.pooled:
                        request = bind_operation(self.version, self.authentication, self.user, self.password, auto_encode=self.auto_encode)
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'simple BIND request <%s> sent via <%s>', bind_request_to_dict(request), self)
                        response = self.post_send_single_response(self.send('bindRequest', request, controls))
                    else:
                        response = self.strategy.validate_bind(controls)  # only for REUSABLE
                elif self.authentication == SASL:
                    if self.sasl_mechanism in SASL_AVAILABLE_MECHANISMS:
                        if log_enabled(PROTOCOL):
                            log(PROTOCOL, 'performing SASL BIND for <%s>', self)
                        if not self.strategy.pooled:
>                           response = self.do_sasl_bind(controls)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../ldap3/core/connection.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], tls=Tls(loc...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
controls = None

    def do_sasl_bind(self,
                     controls):
        if log_enabled(BASIC):
            log(BASIC, 'start SASL BIND operation via <%s>', self)
        self.last_error = None
        with self.connection_lock:
            result = None
    
            if not self.sasl_in_progress:
                self.sasl_in_progress = True
                try:
                    if self.sasl_mechanism == EXTERNAL:
>                       result = sasl_external(self, controls)
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../ldap3/core/connection.py:1338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

connection = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], tls=Tls(loc...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
controls = None

    def sasl_external(connection, controls):
>       result = send_sasl_negotiation(connection, controls, connection.sasl_credentials)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../protocol/sasl/external.py:30: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

connection = Connection(server=Server(host='localhost', port=3389, use_ssl=False, allowed_referral_hosts=[('*', True)], tls=Tls(loc...oder=True, auto_range=True, return_empty_attributes=True, auto_encode=True, auto_escape=True, use_referral_cache=False)
controls = None
payload = 'dn:cn=LJC9oOzqZ3ZcANjWEQE0,ou=users,DC=ldap,DC=goauthentik,DC=io'

    def send_sasl_negotiation(connection, controls, payload):
        from ...operation.bind import bind_operation
    
        request = bind_operation(connection.version, SASL, None, None, connection.sasl_mechanism, payload)
>       response = connection.post_send_single_response(connection.send('bindRequest', request, controls))
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../protocol/sasl/sasl.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7eff0d2efbb0>
message_id = 22

    def post_send_single_response(self, message_id):
        """
        Executed after an Operation Request (except Search)
        Returns the result message or None
        """
>       responses, result = self.get_response(message_id)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../ldap3/strategy/sync.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ldap3.strategy.sync.SyncStrategy object at 0x7eff0d2efbb0>
message_id = 22, timeout = 20, get_request = False

    def get_response(self, message_id, timeout=None, get_request=False):
        """
        Get response LDAP messages
        Responses are returned by the underlying connection strategy
        Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
        Result is stored in connection.result
        Responses without result is stored in connection.response
        A tuple (responses, result) is returned
        """
        if timeout is None:
            timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
        response = None
        result = None
        # request = None
        if self._outstanding and message_id in self._outstanding:
            responses = self._get_response(message_id, timeout)
    
            if not responses:
                if log_enabled(ERROR):
                    log(ERROR, 'socket timeout, no response from server for <%s>', self.connection)
                raise LDAPResponseTimeoutError('no response from server')
    
            if responses == SESSION_TERMINATED_BY_SERVER:
                try:  # try to close the session but don't raise any error if server has already closed the session
                    self.close()
                except (socket.error, LDAPExceptionError):
                    pass
                self.connection.last_error = 'session terminated by server'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPSessionTerminatedByServerError(self.connection.last_error)
            elif responses == TRANSACTION_ERROR:  # Novell LDAP Transaction unsolicited notification
                self.connection.last_error = 'transaction error'
                if log_enabled(ERROR):
                    log(ERROR, '<%s> for <%s>', self.connection.last_error, self.connection)
                raise LDAPTransactionError(self.connection.last_error)
    
            # if referral in response opens a new connection to resolve referrals if requested
    
            if responses[-2]['result'] == RESULT_REFERRAL:
                if self.connection.usage:
                    self.connection._usage.referrals_received += 1
                if self.connection.auto_referrals:
                    ref_response, ref_result = self.do_operation_on_referral(self._outstanding[message_id], responses[-2]['referrals'])
                    if ref_response is not None:
                        responses = ref_response + [ref_result]
                        responses.append(RESPONSE_COMPLETE)
                    elif ref_result is not None:
                        responses = [ref_result, RESPONSE_COMPLETE]
    
                    self._referrals = []
    
            if responses:
                result = responses[-2]
                response = responses[:-2]
                self.connection.result = None
                self.connection.response = None
    
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                self._outstanding.pop(message_id)
                self.connection.result = result.copy()
>               raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
E               ldap3.core.exceptions.LDAPInvalidCredentialsResult: LDAPInvalidCredentialsResult - 49 - invalidCredentials - None - None - bindResponse - None

.venv/lib/python3.14.../ldap3/strategy/base.py:403: LDAPInvalidCredentialsResult

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 28, 2026

authentik PR Installation instructions

Instructions for docker-compose

Add the following block to your .env file:

AUTHENTIK_IMAGE=ghcr.io/goauthentik/dev-server
AUTHENTIK_TAG=gh-46b2a1e88f30feeafbdeca74b5c4d3d77f95547e
AUTHENTIK_OUTPOSTS__CONTAINER_IMAGE_BASE=ghcr.io/goauthentik/dev-%(type)s:gh-%(build_hash)s

Afterwards, run the upgrade commands from the latest release notes.

Instructions for Kubernetes

Add the following block to your values.yml file:

authentik:
    outposts:
        container_image_base: ghcr.io/goauthentik/dev-%(type)s:gh-%(build_hash)s
global:
    image:
        repository: ghcr.io/goauthentik/dev-server
        tag: gh-46b2a1e88f30feeafbdeca74b5c4d3d77f95547e

Afterwards, run the upgrade commands from the latest release notes.

BeryJu added 2 commits March 28, 2026 20:58
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
@BeryJu BeryJu force-pushed the providers/ldap/sasl-tls branch from 93fd7bf to 514e73c Compare March 28, 2026 19:58
@BeryJu BeryJu requested a review from a team as a code owner March 28, 2026 20:14
@BeryJu BeryJu force-pushed the providers/ldap/sasl-tls branch from f022bed to 514e73c Compare March 28, 2026 20:19
BeryJu added 2 commits March 28, 2026 21:48
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Signed-off-by: Jens Langhammer <jens@goauthentik.io>

# Conflicts:
#	tests/e2e/test_provider_ldap.py
@BeryJu BeryJu removed the request for review from a team March 30, 2026 01:09
Signed-off-by: Jens Langhammer <jens@goauthentik.io>

# Conflicts:
#	internal/outpost/ldap/bind/memory/memory.go
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 2, 2026

Deploy Preview for authentik-storybook ready!

Name Link
🔨 Latest commit 2005651
🔍 Latest deploy log https://app.netlify.com/projects/authentik-storybook/deploys/69d4fd6db1696e000808da8b
😎 Deploy Preview https://deploy-preview-21212--authentik-storybook.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

# Conflicts:
#	tests/e2e/test_provider_ldap.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant