from typing import Any, Dict, List, Optional, Tuple, Union
from asn1crypto import pem
from cbor2 import CBORTag
from .cbor_processor import CBORProcessor
from .const import (
COSE_ALGORITHMS_CEK,
COSE_ALGORITHMS_CEK_NON_AEAD,
COSE_ALGORITHMS_CKDM,
COSE_ALGORITHMS_CKDM_KEY_AGREEMENT,
COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_DIRECT,
COSE_ALGORITHMS_HPKE,
COSE_ALGORITHMS_KEY_WRAP,
COSE_ALGORITHMS_MAC,
COSE_ALGORITHMS_RECIPIENT,
COSE_ALGORITHMS_SIGNATURE,
)
from .cose_key_interface import COSEKeyInterface
from .recipient_algs.hpke import HPKE
from .recipient_interface import RecipientInterface
from .recipients import Recipients
from .signer import Signer
from .utils import to_cose_header
[docs]
class COSE(CBORProcessor):
"""
A COSE (CBOR Object Signing and Encryption) Implementaion built on top of
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_.
"""
def __init__(
self,
alg_auto_inclusion: bool = False,
kid_auto_inclusion: bool = False,
verify_kid: bool = False,
ca_certs: str = "",
):
if not isinstance(alg_auto_inclusion, bool):
raise ValueError("alg_auto_inclusion should be bool.")
self._alg_auto_inclusion = alg_auto_inclusion
if not isinstance(kid_auto_inclusion, bool):
raise ValueError("kid_auto_inclusion should be bool.")
self._kid_auto_inclusion = kid_auto_inclusion
if not isinstance(verify_kid, bool):
raise ValueError("verify_kid should be bool.")
self._verify_kid = verify_kid
self._ca_certs = []
if ca_certs:
if not isinstance(ca_certs, str):
raise ValueError("ca_certs should be str.")
self._trust_roots: List[bytes] = []
with open(ca_certs, "rb") as f:
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True):
self._ca_certs.append(der_bytes)
[docs]
@classmethod
def new(
cls,
alg_auto_inclusion: bool = False,
kid_auto_inclusion: bool = False,
verify_kid: bool = False,
ca_certs: str = "",
):
"""
Constructor.
Args:
alg_auto_inclusion(bool): The indicator whether ``alg`` parameter is included
in a proper header bucket automatically or not.
kid_auto_inclusion(bool): The indicator whether ``kid`` parameter is included
in a proper header bucket automatically or not.
verify_kid(bool): The indicator whether ``kid`` verification is mandatory or
not.
ca_certs(str): The path to a file which contains a concatenated list
of trusted root certificates. You should specify private CA
certificates in your target system. There should be no need to
use the public CA certificates for the Web PKI.
"""
return cls(alg_auto_inclusion, kid_auto_inclusion, verify_kid, ca_certs)
@property
def alg_auto_inclusion(self) -> bool:
"""
If this property is True, an encode_and_*() function will automatically
set the ``alg`` parameter in the header from the COSEKey argument.
"""
return self._alg_auto_inclusion
@alg_auto_inclusion.setter
def alg_auto_inclusion(self, alg_auto_inclusion: bool):
self._alg_auto_inclusion = alg_auto_inclusion
return
@property
def kid_auto_inclusion(self) -> bool:
"""
If this property is True, an encode_and_*() function will automatically
set the ``kid`` parameter in the header from the COSEKey argument.
"""
return self._kid_auto_inclusion
@kid_auto_inclusion.setter
def kid_auto_inclusion(self, kid_auto_inclusion: bool):
self._kid_auto_inclusion = kid_auto_inclusion
return
@property
def verify_kid(self) -> bool:
"""
If this property is True, the decode() function will perform the verification
and decoding process only if the ``kid`` of the COSE data to be decoded and
one of the ``kid`` s in the key list given as an argument match exact.
"""
return self._verify_kid
@verify_kid.setter
def verify_kid(self, verify_kid: bool):
self._verify_kid = verify_kid
return
[docs]
def encode(
self,
payload: bytes,
key: Optional[COSEKeyInterface] = None,
protected: Optional[dict] = None,
unprotected: Optional[dict] = None,
recipients: List[RecipientInterface] = [],
signers: List[Signer] = [],
external_aad: bytes = b"",
out: str = "",
) -> bytes:
"""
Encodes COSE message with MAC, signing and encryption.
Args:
payload (bytes): A content to be MACed, signed or encrypted.
key (Optional[COSEKeyInterface]): A content encryption key as COSEKey.
protected (Optional[dict]): Parameters that are to be cryptographically protected.
unprotected (Optional[dict]): Parameters that are not cryptographically protected.
recipients (List[RecipientInterface]): A list of recipient information structures.
signers (List[Signer]): A list of signer information objects for
multiple signer cases.
external_aad(bytes): External additional authenticated data supplied
by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If
``"cbor2/CBORTag"`` is specified. This function will return encoded
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
typ = self._validate_cose_message(key, p, u, recipients, signers)
if typ == 0:
return self._encode_and_encrypt(payload, key, p, u, recipients, external_aad, out)
elif typ == 1:
return self._encode_and_mac(payload, key, p, u, recipients, external_aad, out)
# elif typ == 2:
return self._encode_and_sign(payload, key, p, u, signers, external_aad, out)
[docs]
def encode_and_encrypt(
self,
payload: bytes,
key: Optional[COSEKeyInterface] = None,
protected: Optional[dict] = None,
unprotected: Optional[dict] = None,
recipients: List[RecipientInterface] = [],
external_aad: bytes = b"",
out: str = "",
) -> bytes:
"""
Encodes data with encryption.
Args:
payload (bytes): A content to be encrypted.
key (Optional[COSEKeyInterface]): A content encryption key as COSEKey.
protected (Optional[dict]): Parameters that are to be cryptographically protected.
unprotected (Optional[dict]): Parameters that are not cryptographically protected.
recipients (List[RecipientInterface]): A list of recipient information structures.
external_aad(bytes): External additional authenticated data supplied
by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If
``"cbor2/CBORTag"`` is specified. This function will return encoded
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
typ = self._validate_cose_message(key, p, u, recipients, [])
if typ != 0:
raise ValueError("The COSE message is not suitable for COSE Encrypt0/Encrypt.")
return self._encode_and_encrypt(payload, key, p, u, recipients, external_aad, out)
[docs]
def encode_and_mac(
self,
payload: bytes,
key: Optional[COSEKeyInterface] = None,
protected: Optional[dict] = None,
unprotected: Optional[dict] = None,
recipients: List[RecipientInterface] = [],
external_aad: bytes = b"",
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encodes data with MAC.
Args:
payload (bytes): A content to be MACed.
key (COSEKeyInterface): A COSE key as a MAC Authentication key.
protected (Optional[dict]): Parameters that are to be cryptographically protected.
unprotected (Optional[dict]): Parameters that are not cryptographically protected.
recipients (List[RecipientInterface]): A list of recipient information structures.
external_aad(bytes): External additional authenticated data supplied by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s ``CBORTag`` object.
If any other value is specified, it will return encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
typ = self._validate_cose_message(key, p, u, recipients, [])
if typ != 1:
raise ValueError("The COSE message is not suitable for COSE MAC0/MAC.")
return self._encode_and_mac(payload, key, p, u, recipients, external_aad, out)
[docs]
def encode_and_sign(
self,
payload: bytes,
key: Optional[COSEKeyInterface] = None,
protected: Optional[dict] = None,
unprotected: Optional[dict] = None,
signers: List[Signer] = [],
external_aad: bytes = b"",
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encodes data with signing.
Args:
payload (bytes): A content to be signed.
key (Optional[COSEKeyInterface]): A signing key for single signer
cases. When the ``signers`` parameter is set, this ``key`` will
be ignored and should not be set.
protected (Optional[dict]): Parameters that are to be cryptographically protected.
unprotected (Optional[dict]): Parameters that are not cryptographically protected.
signers (List[Signer]): A list of signer information objects for
multiple signer cases.
external_aad(bytes): External additional authenticated data supplied
by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If
``"cbor2/CBORTag"`` is specified. This function will return encoded
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p, u = self._encode_headers(key, protected, unprotected)
typ = self._validate_cose_message(key, p, u, [], signers)
if typ != 2:
raise ValueError("The COSE message is not suitable for COSE Sign0/Sign.")
return self._encode_and_sign(payload, key, p, u, signers, external_aad, out)
[docs]
def decode(
self,
data: Union[bytes, CBORTag],
keys: Union[COSEKeyInterface, List[COSEKeyInterface]],
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
external_aad: bytes = b"",
detached_payload: Optional[bytes] = None,
) -> bytes:
"""
Verifies and decodes COSE data, and returns only payload.
Args:
data (Union[bytes, CBORTag]): A byte string or cbor2.CBORTag of an
encoded data.
keys (Union[COSEKeyInterface, List[COSEKeyInterface]]): COSE key(s)
to verify and decrypt the encoded data.
context (Optional[Union[Dict[str, Any], List[Any]]]): A context information
structure for key deriviation functions.
external_aad(bytes): External additional authenticated data supplied by
application.
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
Returns:
bytes: A byte string of decoded payload.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode data.
VerifyError: Failed to verify data.
"""
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload)
return res
def _encode_headers(
self,
key: Optional[COSEKeyInterface],
protected: Optional[dict],
unprotected: Optional[dict],
) -> Tuple[Dict[int, Any], Dict[int, Any]]:
p = to_cose_header(protected)
u = to_cose_header(unprotected)
if key is not None:
if self._alg_auto_inclusion:
if key.alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
u[1] = key.alg
else:
p[1] = key.alg
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
# Check the protected header is empty if the algorithm is non AEAD (AES-CBC or AES-CTR)
# because section 4 of RFC9459 says "The 'protected' header MUST be a zero-length byte string."
alg = p[1] if 1 in p else u.get(1, 0)
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values() and len(p) > 0:
raise ValueError("protected header MUST be zero-length")
return p, u
def _decode_headers(self, protected: Any, unprotected: Any) -> Tuple[Dict[int, Any], Dict[int, Any]]:
p: Union[Dict[int, Any], bytes]
p = self._loads(protected) if protected else {}
if isinstance(p, bytes):
if len(p) > 0:
raise ValueError("Invalid protected header.")
p = {}
u: Dict[int, Any] = unprotected
if not isinstance(u, dict):
raise ValueError("unprotected header should be dict.")
return p, u
def _validate_cose_message(
self,
key: Optional[COSEKeyInterface],
p: Dict[int, Any],
u: Dict[int, Any],
recipients: List[RecipientInterface],
signers: List[Signer],
) -> int:
if len(recipients) > 0 and len(signers) > 0:
raise ValueError("Both recipients and signers are specified.")
h: Dict[int, Any] = {}
iv_count: int = 0
for k, v in p.items():
if k == 2: # crit
if not isinstance(v, list):
raise ValueError("crit parameter must have list.")
for crit in v:
if not isinstance(crit, int):
raise ValueError("Integer labels for crit are only supported.")
if crit >= 0 and crit <= 7:
raise ValueError("Integer labels for crit in the range of 0 to 7 should be omitted.")
if crit not in p.keys() and crit not in u.keys():
raise ValueError(f"Integer label({crit}) for crit not found in the headers.")
if k == 5 or k == 6: # IV or Partial IV
if not isinstance(v, bytes):
raise ValueError("IV and Partial IV must be bstr.")
iv_count += 1
h[k] = v
for k, v in u.items():
if k == 2: # crit
raise ValueError("crit(2) must be placed only in protected header.")
if k == 5 or k == 6: # IV or Partial IV
if not isinstance(v, bytes):
raise ValueError("IV and Partial IV must be bstr.")
iv_count += 1
h[k] = v
if len(h) != len(p) + len(u):
raise ValueError("The same keys are both in protected and unprotected headers.")
if iv_count > 1:
raise ValueError("IV and Partial IV must not both be present in the same security layer.")
if 1 in p and 1 in u:
raise ValueError("alg appear both in protected and unprotected.")
alg = p[1] if 1 in p else u.get(1, 0)
if len(signers) > 0:
return 2 # Sign
if len(recipients) == 0:
if key is None:
raise ValueError("key should be set.")
if alg not in COSE_ALGORITHMS_HPKE.values() and key.alg != alg:
raise ValueError(f"The alg({alg}) in the headers does not match the alg({key.alg}) in the populated key.")
if alg in COSE_ALGORITHMS_CEK.values():
return 0 # Encrypt0
if alg in COSE_ALGORITHMS_HPKE.values():
return 0 # Encrypt0
if alg in COSE_ALGORITHMS_MAC.values():
return 1 # MAC0
if alg in COSE_ALGORITHMS_SIGNATURE.values():
return 2 # Sign0
raise ValueError(f"Invalid alg for single-layer COSE message: {alg}.")
if recipients[0].alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_DIRECT.values():
if len(recipients) > 1:
raise ValueError("There must be only one recipient in direct key agreement mode.")
if recipients[0].alg in COSE_ALGORITHMS_CKDM.values(): # Direct encryption mode.
for r in recipients:
if r.alg not in COSE_ALGORITHMS_CKDM.values():
raise ValueError("All of the recipient alg must be the direct encryption mode.")
if len(r.recipients) > 0:
raise ValueError("Recipients for direct encryption mode don't have recipients.")
if len(r.ciphertext) > 0:
raise ValueError(
"The ciphertext in the recipients for direct encryption mode must be a zero-length byte string."
)
if recipients[0].alg in COSE_ALGORITHMS_KEY_WRAP.values():
for r in recipients:
if len(r.protected) > 0:
raise ValueError("The protected header must be a zero-length string in key wrap mode with an AE algorithm.")
if (
recipients[0].alg == -6 # direct
or recipients[0].alg in COSE_ALGORITHMS_HPKE.values()
or recipients[0].alg in COSE_ALGORITHMS_KEY_WRAP.values()
):
if key is None:
raise ValueError("key should be set.")
if key.alg != alg:
raise ValueError(f"The alg({alg}) in the headers does not match the alg({key.alg}) in the populated key.")
if alg in COSE_ALGORITHMS_CEK.values():
return 0 # Encrypt
if alg in COSE_ALGORITHMS_HPKE.values():
return 0 # Encrypt
if alg in COSE_ALGORITHMS_MAC.values():
return 1 # MAC
raise ValueError(f"Invalid alg for single-layer COSE message: {alg}.")
if (
recipients[0].alg in COSE_ALGORITHMS_CKDM.values()
or recipients[0].alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT.values()
):
if recipients[0].context[0] in COSE_ALGORITHMS_CEK.values():
return 0 # Encrypt
if recipients[0].context[0] in COSE_ALGORITHMS_HPKE.values():
return 0 # Encrypt
if recipients[0].context[0] in COSE_ALGORITHMS_KEY_WRAP.values():
return 0 # Encrypt
if recipients[0].context[0] in COSE_ALGORITHMS_MAC.values():
return 1 # MAC
raise ValueError(f"Invalid alg in recipients' context information: {recipients[0]._context[0]}.")
raise ValueError(f"Unsupported or unknown alg: {alg}.")
def _encode_and_encrypt(
self,
payload: bytes,
key: Optional[COSEKeyInterface],
p: Dict[int, Any],
u: Dict[int, Any],
recipients: List[RecipientInterface],
external_aad: bytes,
out: str,
) -> bytes:
b_protected = self._dumps(p) if p else b""
ciphertext: bytes = b""
# Encrypt0
if len(recipients) == 0:
enc_structure = ["Encrypt0", b_protected, external_aad]
aad = self._dumps(enc_structure)
if 1 in p and p[1] in COSE_ALGORITHMS_HPKE.values(): # HPKE
hpke = HPKE(p, u, recipient_key=key)
encoded, _ = hpke.encode(payload, aad)
res = CBORTag(16, encoded)
return res if out == "cbor2/CBORTag" else self._dumps(res)
if key is None:
raise ValueError("key should be set.")
if 5 not in u: # nonce
try:
u[5] = key.generate_nonce()
except NotImplementedError:
raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.")
ciphertext = key.encrypt(payload, u[5], aad)
res = CBORTag(16, [b_protected, u, ciphertext])
return res if out == "cbor2/CBORTag" else self._dumps(res)
# Encrypt
if recipients[0].alg not in COSE_ALGORITHMS_RECIPIENT.values():
raise NotImplementedError("Algorithms other than direct are not supported for recipients.")
recs = []
b_key = key.to_bytes() if isinstance(key, COSEKeyInterface) else b""
cek: Optional[COSEKeyInterface] = None
for rec in recipients:
aad = self._dumps(["Enc_Recipient", self._dumps(rec.protected) if len(rec.protected) > 0 else b"", external_aad])
encoded, derived_key = rec.encode(b_key, aad)
cek = derived_key if derived_key else key
recs.append(encoded)
if cek is None:
raise ValueError("key should be set.")
if 5 not in u: # nonce
try:
u[5] = cek.generate_nonce()
except NotImplementedError:
raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.")
enc_structure = ["Encrypt", b_protected, external_aad]
aad = self._dumps(enc_structure)
ciphertext = cek.encrypt(payload, u[5], aad)
cose_enc: List[Any] = [b_protected, u, ciphertext]
cose_enc.append(recs)
res = CBORTag(96, cose_enc)
return res if out == "cbor2/CBORTag" else self._dumps(res)
def _encode_and_mac(
self,
payload: bytes,
key: Optional[COSEKeyInterface],
p: Dict[int, Any],
u: Dict[int, Any],
recipients: List[RecipientInterface],
external_aad: bytes,
out: str,
) -> Union[bytes, CBORTag]:
b_protected = self._dumps(p) if p else b""
# MAC0
if len(recipients) == 0:
if key is None:
raise ValueError("key should be set.")
mac_structure = ["MAC0", b_protected, external_aad, payload]
tag = key.sign(self._dumps(mac_structure))
res = CBORTag(17, [b_protected, u, payload, tag])
return res if out == "cbor2/CBORTag" else self._dumps(res)
# MAC
if recipients[0].alg not in COSE_ALGORITHMS_RECIPIENT.values():
raise NotImplementedError("Algorithms other than direct are not supported for recipients.")
mac_structure = ["MAC", b_protected, external_aad, payload]
recs = []
b_key = key.to_bytes() if isinstance(key, COSEKeyInterface) else b""
for rec in recipients:
aad = self._dumps(["Mac_Recipient", self._dumps(rec.protected), external_aad])
encoded, derived_key = rec.encode(b_key, aad)
key = derived_key if derived_key else key
recs.append(encoded)
if key is None:
raise ValueError("key should be set.")
tag = key.sign(self._dumps(mac_structure))
cose_mac: List[Any] = [b_protected, u, payload, tag]
cose_mac.append(recs)
res = CBORTag(97, cose_mac)
return res if out == "cbor2/CBORTag" else self._dumps(res)
def _encode_and_sign(
self,
payload: bytes,
key: Optional[COSEKeyInterface],
p: Dict[int, Any],
u: Dict[int, Any],
signers: List[Signer],
external_aad: bytes,
out: str,
) -> Union[bytes, CBORTag]:
b_protected = self._dumps(p) if p else b""
# Signature1
if not signers and key is not None:
sig_structure = ["Signature1", b_protected, external_aad, payload]
sig = key.sign(self._dumps(sig_structure))
res = CBORTag(18, [b_protected, u, payload, sig])
return res if out == "cbor2/CBORTag" else self._dumps(res)
# Signature
sigs = []
for s in signers:
sig_structure = ["Signature", b_protected, s.protected, external_aad, payload]
s.sign(self._dumps(sig_structure))
sigs.append([s.protected, s.unprotected, s.signature])
res = CBORTag(98, [b_protected, u, payload, sigs])
return res if out == "cbor2/CBORTag" else self._dumps(res)
def _filter_by_key_ops(self, keys: List[COSEKeyInterface], op: int) -> List[COSEKeyInterface]:
res: List[COSEKeyInterface] = []
for k in keys:
if op in k.key_ops:
res.append(k)
if len(res) == 0:
res = keys
return res
def _get_alg(self, protected: Any) -> int:
return protected[1] if isinstance(protected, dict) and 1 in protected else 0
def _get_kid(self, protected: Any, unprotected: dict) -> bytes:
kid = b""
if isinstance(protected, dict) and 4 in protected:
kid = protected[4]
elif 4 in unprotected:
kid = unprotected[4]
elif self._verify_kid:
raise ValueError("kid should be specified.")
return kid