From c22f11bfcc6be64246f2522c02030af8113c6f4d Mon Sep 17 00:00:00 2001 From: Fredrik Thulin Date: Tue, 22 Sep 2015 14:50:02 +0200 Subject: [PATCH 1/6] Update adata to match draft-ietf-jose-json-web-encryption-40. --- jose.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/jose.py b/jose.py index fa2859c..5891143 100644 --- a/jose.py +++ b/jose.py @@ -146,6 +146,8 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', assert _TEMP_VER_KEY not in header header[_TEMP_VER_KEY] = claims[_TEMP_VER_KEY] + adata = _jwe_adata_str(adata, b64encode_url(json_encode(header))) + plaintext = json_encode(claims) # compress (if required) @@ -202,6 +204,8 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): b64decode_url, jwe) header = json_decode(header) + adata = _jwe_adata_str(adata, jwe[0]) + # decrypt cek (_, decipher), _ = JWA[header['alg']] encryption_key = decipher(encryption_key_ciphertext, jwk) @@ -524,6 +528,22 @@ def _validate(claims, validate_claims, expiry_seconds): _check_not_before(now, not_before) +def _jwe_adata_str(adata, header): + if adata == '': + # draft-ietf-jose-json-web-encryption-40 5.1. Message Encryption step 14 + # 'Let the Additional Authenticated Data encryption parameter be + # ASCII(Encoded Protected Header).' + adata = str(header) + else: + # However if a JWE AAD value is present + # (which can only be the case when using the JWE JSON + # Serialization), instead let the Additional Authenticated Data + # encryption parameter be ASCII(Encoded Protected Header || '.' || + # BASE64URL(JWE AAD)). + adata = '.'.join([str(header), adata]) + return adata + + def _jwe_hash_str(ciphertext, iv, adata='', legacy=False): # http://tools.ietf.org/html/ # draft-ietf-jose-json-web-algorithms-24#section-5.2.2.1 From f3edbd31b23ad6ce9be9d2286b3865ee358f988b Mon Sep 17 00:00:00 2001 From: Fredrik Thulin Date: Tue, 22 Sep 2015 14:52:14 +0200 Subject: [PATCH 2/6] _jwe_hash_str: Don't use a separator when concatenating. Neither draft-ietf-jose-json-web-algorithms-24 (old reference) nor the current draft-ietf-jose-json-web-algorithms-40 says to concatenate these string with dots, they say: M = MAC(MAC_KEY, A || IV || E || AL), and The concatenation of two values A and B is denoted as A || B. However, since backwards compatibility with old tokens seem to be a concern here, I'm not making the same bugfix to the 'legacy' code branch. --- jose.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jose.py b/jose.py index 5891143..10048b9 100644 --- a/jose.py +++ b/jose.py @@ -545,11 +545,11 @@ def _jwe_adata_str(adata, header): def _jwe_hash_str(ciphertext, iv, adata='', legacy=False): - # http://tools.ietf.org/html/ - # draft-ietf-jose-json-web-algorithms-24#section-5.2.2.1 if legacy: return '.'.join((adata, iv, ciphertext, str(len(adata)))) - return '.'.join((adata, iv, ciphertext, pack("!Q", len(adata) * 8))) + # http://tools.ietf.org/html/ + # draft-ietf-jose-json-web-algorithms-40#section-5.2.2.1 + return ''.join((adata, iv, ciphertext, pack("!Q", len(adata) * 8))) def _jws_hash_str(header, claims): From bc27957317f2521edbee838420a2f774c68600b0 Mon Sep 17 00:00:00 2001 From: Fredrik Thulin Date: Tue, 22 Sep 2015 14:58:45 +0200 Subject: [PATCH 3/6] Detect non-legacy tokens using encryption key lenght too. To interoperate with other implementations that won't put the '__v' magic key in the headers, it is necessary to enter the non-legacy code branch using some other mechanism than the presence of '__v' (which I understand to be an invention of this particular implementation). I won't claim to have extensive knowledge or experience with JOSE, but this trick at least lets me interoperate with a dot-NET client. --- jose.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jose.py b/jose.py index 10048b9..91a159c 100644 --- a/jose.py +++ b/jose.py @@ -213,7 +213,8 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): # decrypt body ((_, decipher), _), ((hash_fn, _), mod) = JWA[header['enc']] - if header.get(_TEMP_VER_KEY) == _TEMP_VER: + if header.get(_TEMP_VER_KEY) == _TEMP_VER or \ + len(encryption_key) == mod.digest_size: plaintext = decipher(ciphertext, encryption_key[-mod.digest_size/2:], iv) hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), encryption_key[:-mod.digest_size/2], mod=mod) From dba61877ac52516f1ecfa91382f83cf8354da1ad Mon Sep 17 00:00:00 2001 From: Fredrik Thulin Date: Tue, 22 Sep 2015 15:01:18 +0200 Subject: [PATCH 4/6] Check MAC before decrypting claims. As a matter of hygiene, one shouldn't decrypt stuff before checking the authenticity of it. For the legacy mode it is unavoidable though, since the MAC is computed using the plaintext. --- jose.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/jose.py b/jose.py index 91a159c..59f3f36 100644 --- a/jose.py +++ b/jose.py @@ -215,16 +215,19 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): if header.get(_TEMP_VER_KEY) == _TEMP_VER or \ len(encryption_key) == mod.digest_size: - plaintext = decipher(ciphertext, encryption_key[-mod.digest_size/2:], iv) hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), encryption_key[:-mod.digest_size/2], mod=mod) + if not const_compare(auth_tag(hash), tag): + raise Error('Mismatched authentication tags') + + plaintext = decipher(ciphertext, encryption_key[-mod.digest_size/2:], iv) else: plaintext = decipher(ciphertext, encryption_key[:-mod.digest_size], iv) hash = hash_fn(_jwe_hash_str(plaintext, iv, adata, True), encryption_key[-mod.digest_size:], mod=mod) - if not const_compare(auth_tag(hash), tag): - raise Error('Mismatched authentication tags') + if not const_compare(auth_tag(hash), tag): + raise Error('Mismatched authentication tags') if 'zip' in header: try: From dddc250ee11b8898691fc3f6e95dae97f1e419c6 Mon Sep 17 00:00:00 2001 From: Fredrik Thulin Date: Tue, 22 Sep 2015 15:08:29 +0200 Subject: [PATCH 5/6] Refactor key slicing to increase readability (IMO). --- jose.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/jose.py b/jose.py index 59f3f36..099eb72 100644 --- a/jose.py +++ b/jose.py @@ -164,10 +164,11 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', ((cipher, _), key_size), ((hash_fn, _), hash_mod) = JWA[enc] iv = rng(AES.block_size) encryption_key = rng(hash_mod.digest_size) + enc_key = encryption_key[-hash_mod.digest_size/2:] # first half + mac_key = encryption_key[:-hash_mod.digest_size/2] # second half - ciphertext = cipher(plaintext, encryption_key[-hash_mod.digest_size/2:], iv) - hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), - encryption_key[:-hash_mod.digest_size/2], hash_mod) + ciphertext = cipher(plaintext, enc_key, iv) + hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), mac_key, hash_mod) # cek encryption (cipher, _), _ = JWA[alg] @@ -213,18 +214,21 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): # decrypt body ((_, decipher), _), ((hash_fn, _), mod) = JWA[header['enc']] + enc_key = encryption_key[-mod.digest_size/2:] # first half + mac_key = encryption_key[:-mod.digest_size/2] # second half + if header.get(_TEMP_VER_KEY) == _TEMP_VER or \ len(encryption_key) == mod.digest_size: - hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), - encryption_key[:-mod.digest_size/2], mod=mod) + hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), mac_key, mod=mod) if not const_compare(auth_tag(hash), tag): raise Error('Mismatched authentication tags') - plaintext = decipher(ciphertext, encryption_key[-mod.digest_size/2:], iv) + plaintext = decipher(ciphertext, enc_key, iv) else: - plaintext = decipher(ciphertext, encryption_key[:-mod.digest_size], iv) - hash = hash_fn(_jwe_hash_str(plaintext, iv, adata, True), - encryption_key[-mod.digest_size:], mod=mod) + enc_key = encryption_key[:-mod.digest_size] # first half + mac_key = encryption_key[-mod.digest_size:] # second half + plaintext = decipher(ciphertext, enc_key, iv) + hash = hash_fn(_jwe_hash_str(plaintext, iv, adata, True), mac_key, mod=mod) if not const_compare(auth_tag(hash), tag): raise Error('Mismatched authentication tags') From 5c41e7de07987f67adfdf3eb648b6c4243b6f873 Mon Sep 17 00:00:00 2001 From: Fredrik Thulin Date: Tue, 22 Sep 2015 15:56:25 +0200 Subject: [PATCH 6/6] Bugfix adata handling to work with legacy encryption and compression. Good thing there were test cases covering these cases... --- jose.py | 8 ++++---- tests.py | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/jose.py b/jose.py index 099eb72..fe5393c 100644 --- a/jose.py +++ b/jose.py @@ -146,8 +146,6 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', assert _TEMP_VER_KEY not in header header[_TEMP_VER_KEY] = claims[_TEMP_VER_KEY] - adata = _jwe_adata_str(adata, b64encode_url(json_encode(header))) - plaintext = json_encode(claims) # compress (if required) @@ -160,6 +158,8 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', 'Unsupported compression algorithm: {}'.format(compression)) plaintext = compress(plaintext) + adata = _jwe_adata_str(adata, b64encode_url(json_encode(header))) + # body encryption/hash ((cipher, _), key_size), ((hash_fn, _), hash_mod) = JWA[enc] iv = rng(AES.block_size) @@ -205,8 +205,6 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): b64decode_url, jwe) header = json_decode(header) - adata = _jwe_adata_str(adata, jwe[0]) - # decrypt cek (_, decipher), _ = JWA[header['alg']] encryption_key = decipher(encryption_key_ciphertext, jwk) @@ -219,6 +217,8 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): if header.get(_TEMP_VER_KEY) == _TEMP_VER or \ len(encryption_key) == mod.digest_size: + adata = _jwe_adata_str(adata, jwe[0]) + hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata), mac_key, mod=mod) if not const_compare(auth_tag(hash), tag): raise Error('Mismatched authentication tags') diff --git a/tests.py b/tests.py index d502698..b49768b 100644 --- a/tests.py +++ b/tests.py @@ -37,9 +37,13 @@ def legacy_encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', if compression is not None: header['zip'] = compression try: - (compress, _) = jose.COMPRESSION[compression] + if compression == 'BAD': + # facilitate test for bad compression algorithm + (compress, _) = jose.COMPRESSION['DEF'] + else: + (compress, _) = jose.COMPRESSION[compression] except KeyError: - raise Error( + raise jose.Error( 'Unsupported compression algorithm: {}'.format(compression)) plaintext = compress(plaintext) @@ -274,13 +278,9 @@ def test_encrypt_invalid_compression_error(self): pass def test_decrypt_invalid_compression_error(self): - jwe = jose.encrypt(claims, rsa_pub_key, compression='DEF') - header = jose.b64encode_url( - '{"alg": "RSA-OAEP", ''"enc": "A128CBC-HS256", "__v": 1,' - '"zip": "BAD"}') - + jwe = legacy_encrypt(claims, rsa_pub_key, compression='BAD') try: - jose.decrypt(jose.JWE(*((header,) + (jwe[1:]))), rsa_priv_key) + jose.decrypt(jwe, rsa_priv_key) self.fail() except jose.Error as e: self.assertEqual(