from typing import Any, Dict, List, Optional, Union
from cbor2 import CBORTag
from .cbor_processor import CBORProcessor
from .const import COSE_ALGORITHMS_RECIPIENT
from .cose_key_interface import COSEKeyInterface
from .recipient_interface import RecipientInterface
from .recipients import Recipients
from .signer import Signer
from .utils import to_cose_header
[docs]class COSE(CBORProcessor):
"""
A COSE (CBOR Object Signing and Encryption) Implementaion built on top of
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_.
"""
def __init__(
self,
alg_auto_inclusion: bool = False,
kid_auto_inclusion: bool = False,
verify_kid: bool = False,
):
if not isinstance(alg_auto_inclusion, bool):
raise ValueError("alg_auto_inclusion should be bool.")
self._alg_auto_inclusion = alg_auto_inclusion
if not isinstance(kid_auto_inclusion, bool):
raise ValueError("kid_auto_inclusion should be bool.")
self._kid_auto_inclusion = kid_auto_inclusion
if not isinstance(verify_kid, bool):
raise ValueError("verify_kid should be bool.")
self._verify_kid = verify_kid
[docs] @classmethod
def new(
cls,
alg_auto_inclusion: bool = False,
kid_auto_inclusion: bool = False,
verify_kid: bool = False,
):
"""
Constructor.
Args:
alg_auto_inclusion(bool): The indicator whether ``alg`` parameter is included
in a proper header bucket automatically or not.
kid_auto_inclusion(bool): The indicator whether ``kid`` parameter is included
in a proper header bucket automatically or not.
verify_kid(bool): The indicator whether ``kid`` verification is mandatory or
not.
"""
return cls(alg_auto_inclusion, kid_auto_inclusion, verify_kid)
@property
def alg_auto_inclusion(self) -> bool:
"""
If this property is True, an encode_and_*() function will automatically
set the ``alg`` parameter in the header from the COSEKey argument.
"""
return self._alg_auto_inclusion
@alg_auto_inclusion.setter
def alg_auto_inclusion(self, alg_auto_inclusion: bool):
self._alg_auto_inclusion = alg_auto_inclusion
return
@property
def kid_auto_inclusion(self) -> bool:
"""
If this property is True, an encode_and_*() function will automatically
set the ``kid`` parameter in the header from the COSEKey argument.
"""
return self._kid_auto_inclusion
@kid_auto_inclusion.setter
def kid_auto_inclusion(self, kid_auto_inclusion: bool):
self._kid_auto_inclusion = kid_auto_inclusion
return
@property
def verify_kid(self) -> bool:
"""
If this property is True, the decode() function will perform the verification
and decoding process only if the ``kid`` of the COSE data to be decoded and
one of the ``kid`` s in the key list given as an argument match exact.
"""
return self._verify_kid
@verify_kid.setter
def verify_kid(self, verify_kid: bool):
self._verify_kid = verify_kid
return
[docs] def encode_and_mac(
self,
payload: bytes,
key: COSEKeyInterface,
protected: Optional[Union[dict, bytes]] = None,
unprotected: Optional[dict] = None,
recipients: Optional[List[RecipientInterface]] = None,
external_aad: bytes = b"",
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encodes data with MAC.
Args:
payload (bytes): A content to be MACed.
key (COSEKeyInterface): A COSE key as a MAC Authentication key.
protected (Optional[Union[dict, bytes]]): Parameters that are to be cryptographically
protected.
unprotected (Optional[dict]): Parameters that are not cryptographically protected.
recipients (Optional[List[RecipientInterface]]): A list of recipient information structures.
external_aad(bytes): External additional authenticated data supplied by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s ``CBORTag`` object.
If any other value is specified, it will return encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p: Union[Dict[int, Any], bytes] = (
to_cose_header(protected) if not isinstance(protected, bytes) else protected
)
u = to_cose_header(unprotected)
ctx = "MAC0" if not recipients else "MAC"
b_protected = b""
# MAC0
if not recipients:
if isinstance(p, bytes):
b_protected = p
else:
if self._alg_auto_inclusion:
p[1] = key.alg
b_protected = self._dumps(p)
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
mac_structure = [ctx, b_protected, external_aad, payload]
tag = key.sign(self._dumps(mac_structure))
res = CBORTag(17, [b_protected, u, payload, tag])
return res if out == "cbor2/CBORTag" else self._dumps(res)
# MAC
recs = []
for rec in recipients:
recs.append(rec.to_list())
if recipients[0].alg in COSE_ALGORITHMS_RECIPIENT.values():
if not isinstance(p, bytes):
if self._alg_auto_inclusion:
p[1] = key.alg
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
else:
raise NotImplementedError(
"Algorithms other than direct are not supported for recipients."
)
if isinstance(p, bytes):
b_protected = p
else:
b_protected = self._dumps(p) if p else b""
mac_structure = [ctx, b_protected, external_aad, payload]
tag = key.sign(self._dumps(mac_structure))
cose_mac: List[Any] = [b_protected, u, payload, tag]
cose_mac.append(recs)
res = CBORTag(97, cose_mac)
return res if out == "cbor2/CBORTag" else self._dumps(res)
[docs] def encode_and_sign(
self,
payload: bytes,
key: Optional[COSEKeyInterface] = None,
protected: Optional[Union[dict, bytes]] = None,
unprotected: Optional[dict] = None,
signers: List[Signer] = [],
external_aad: bytes = b"",
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encodes data with signing.
Args:
payload (bytes): A content to be signed.
key (Optional[COSEKeyInterface]): A signing key for single signer
cases. When the ``signers`` parameter is set, this ``key`` will
be ignored and should not be set.
protected (Optional[Union[dict, bytes]]): Parameters that are to be
cryptographically protected.
unprotected (Optional[dict]): Parameters that are not cryptographically
protected.
signers (List[Signer]): A list of signer information objects for
multiple signer cases.
external_aad(bytes): External additional authenticated data supplied
by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If
``"cbor2/CBORTag"`` is specified. This function will return encoded
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p: Union[Dict[int, Any], bytes] = (
to_cose_header(protected) if not isinstance(protected, bytes) else protected
)
u = to_cose_header(unprotected)
ctx = "Signature" if signers else "Signature1"
if not signers and key is not None:
if isinstance(p, dict) and self._alg_auto_inclusion:
p[1] = key.alg
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
b_protected = b""
if isinstance(p, bytes):
b_protected = p
else:
b_protected = self._dumps(p) if p else b""
# Signature1
if not signers and key is not None:
sig_structure = [ctx, b_protected, external_aad, payload]
sig = key.sign(self._dumps(sig_structure))
res = CBORTag(18, [b_protected, u, payload, sig])
return res if out == "cbor2/CBORTag" else self._dumps(res)
# Signature
sigs = []
for s in signers:
sig_structure = [ctx, b_protected, s.protected, external_aad, payload]
s.sign(self._dumps(sig_structure))
sigs.append([s.protected, s.unprotected, s.signature])
res = CBORTag(98, [b_protected, u, payload, sigs])
return res if out == "cbor2/CBORTag" else self._dumps(res)
[docs] def encode_and_encrypt(
self,
payload: bytes,
key: COSEKeyInterface,
protected: Optional[Union[dict, bytes]] = None,
unprotected: Optional[dict] = None,
nonce: bytes = b"",
recipients: Optional[List[RecipientInterface]] = None,
external_aad: bytes = b"",
out: str = "",
) -> bytes:
"""
Encodes data with encryption.
Args:
payload (bytes): A content to be encrypted.
key (COSEKeyInterface): A COSE key as an encryption key.
protected (Optional[Union[dict, bytes]]): Parameters that are to be
cryptographically protected.
unprotected (Optional[dict]): Parameters that are not cryptographically
protected.
nonce (bytes): A nonce for encryption.
recipients (Optional[List[RecipientInterface]]): A list of recipient
information structures.
external_aad(bytes): External additional authenticated data supplied
by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If
``"cbor2/CBORTag"`` is specified. This function will return encoded
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
``CBORTag`` object. If any other value is specified, it will return
encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""
p: Union[Dict[int, Any], bytes] = (
to_cose_header(protected) if not isinstance(protected, bytes) else protected
)
u = to_cose_header(unprotected)
ctx = "Encrypt0" if not recipients else "Encrypt"
if not nonce:
try:
nonce = key.generate_nonce()
except NotImplementedError:
raise ValueError(
"Nonce generation is not supported for the key. Set a nonce explicitly."
)
# Encrypt0
if not recipients:
if isinstance(p, bytes):
b_protected = p
else:
if self._alg_auto_inclusion:
p[1] = key.alg
b_protected = self._dumps(p) if p else b""
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
u[5] = nonce
enc_structure = [ctx, b_protected, external_aad]
aad = self._dumps(enc_structure)
ciphertext = key.encrypt(payload, nonce, aad)
res = CBORTag(16, [b_protected, u, ciphertext])
return res if out == "cbor2/CBORTag" else self._dumps(res)
# Encrypt
recs = []
for rec in recipients:
recs.append(rec.to_list())
if recipients[0].alg in COSE_ALGORITHMS_RECIPIENT.values():
if not isinstance(p, bytes) and self._alg_auto_inclusion:
p[1] = key.alg
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
u[5] = nonce
else:
raise NotImplementedError(
"Algorithms other than direct are not supported for recipients."
)
if isinstance(p, bytes):
b_protected = p
else:
b_protected = self._dumps(p) if p else b""
enc_structure = [ctx, b_protected, external_aad]
aad = self._dumps(enc_structure)
ciphertext = key.encrypt(payload, nonce, aad)
cose_enc: List[Any] = [b_protected, u, ciphertext]
cose_enc.append(recs)
res = CBORTag(96, cose_enc)
return res if out == "cbor2/CBORTag" else self._dumps(res)
[docs] def decode(
self,
data: Union[bytes, CBORTag],
keys: Union[COSEKeyInterface, List[COSEKeyInterface]],
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
external_aad: bytes = b"",
) -> bytes:
"""
Verifies and decodes COSE data.
Args:
data (Union[bytes, CBORTag]): A byte string or cbor2.CBORTag of an
encoded data.
keys (Union[COSEKeyInterface, List[COSEKeyInterface]]): COSE key(s)
to verify and decrypt the encoded data.
context (Optional[Union[Dict[str, Any], List[Any]]]): A context information
structure for key deriviation functions.
external_aad(bytes): External additional authenticated data supplied by
application.
Returns:
bytes: A byte string of decoded payload.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode data.
VerifyError: Failed to verify data.
"""
if isinstance(data, bytes):
data = self._loads(data)
if not isinstance(data, CBORTag):
raise ValueError("Invalid COSE format.")
if not isinstance(keys, list):
if not isinstance(keys, COSEKeyInterface):
raise ValueError("key in keys should have COSEKeyInterface.")
keys = [keys]
if data.tag == 16:
keys = self._filter_by_key_ops(keys, 4)
if not isinstance(data.value, list) or len(data.value) != 3:
raise ValueError("Invalid Encrypt0 format.")
elif data.tag == 96:
keys = self._filter_by_key_ops(keys, 4)
if not isinstance(data.value, list) or len(data.value) != 4:
raise ValueError("Invalid Encrypt format.")
elif data.tag == 17:
keys = self._filter_by_key_ops(keys, 10)
if not isinstance(data.value, list) or len(data.value) != 4:
raise ValueError("Invalid MAC0 format.")
elif data.tag == 97:
keys = self._filter_by_key_ops(keys, 10)
if not isinstance(data.value, list) or len(data.value) != 5:
raise ValueError("Invalid MAC format.")
elif data.tag == 18:
keys = self._filter_by_key_ops(keys, 2)
if not isinstance(data.value, list) or len(data.value) != 4:
raise ValueError("Invalid Signature1 format.")
elif data.tag == 98:
keys = self._filter_by_key_ops(keys, 2)
if not isinstance(data.value, list) or len(data.value) != 4:
raise ValueError("Invalid Signature format.")
else:
raise ValueError(f"Unsupported or unknown CBOR tag({data.tag}).")
protected = self._loads(data.value[0]) if data.value[0] else b""
unprotected = data.value[1]
if not isinstance(unprotected, dict):
raise ValueError("unprotected header should be dict.")
alg = self._get_alg(protected)
err: Exception = ValueError("key is not found.")
# Encrypt0
if data.tag == 16:
kid = self._get_kid(protected, unprotected)
aad = self._dumps(["Encrypt0", data.value[0], external_aad])
nonce = unprotected.get(5, None)
if kid:
for i, k in enumerate(keys):
if k.kid != kid:
continue
try:
return k.decrypt(data.value[2], nonce, aad)
except Exception as e:
err = e
raise err
for i, k in enumerate(keys):
try:
return k.decrypt(data.value[2], nonce, aad)
except Exception as e:
err = e
raise err
# Encrypt
if data.tag == 96:
aad = self._dumps(["Encrypt", data.value[0], external_aad])
nonce = unprotected.get(5, None)
rs = Recipients.from_list(data.value[3], self._verify_kid)
enc_key = rs.extract(keys, context, alg)
return enc_key.decrypt(data.value[2], nonce, aad)
# MAC0
if data.tag == 17:
kid = self._get_kid(protected, unprotected)
msg = self._dumps(["MAC0", data.value[0], external_aad, data.value[2]])
if kid:
for i, k in enumerate(keys):
if k.kid != kid:
continue
try:
k.verify(msg, data.value[3])
return data.value[2]
except Exception as e:
err = e
raise err
for i, k in enumerate(keys):
try:
k.verify(msg, data.value[3])
return data.value[2]
except Exception as e:
err = e
raise err
# MAC
if data.tag == 97:
to_be_maced = self._dumps(
["MAC", data.value[0], external_aad, data.value[2]]
)
rs = Recipients.from_list(data.value[4], self._verify_kid)
mac_auth_key = rs.extract(keys, context, alg)
mac_auth_key.verify(to_be_maced, data.value[3])
return data.value[2]
# Signature1
if data.tag == 18:
kid = self._get_kid(protected, unprotected)
to_be_signed = self._dumps(
["Signature1", data.value[0], external_aad, data.value[2]]
)
if kid:
for i, k in enumerate(keys):
if k.kid != kid:
continue
try:
k.verify(to_be_signed, data.value[3])
return data.value[2]
except Exception as e:
err = e
raise err
for i, k in enumerate(keys):
try:
k.verify(to_be_signed, data.value[3])
return data.value[2]
except Exception as e:
err = e
raise err
# Signature
# if data.tag == 98:
sigs = data.value[3]
if not isinstance(sigs, list):
raise ValueError("Invalid Signature format.")
for sig in sigs:
if not isinstance(sig, list) or len(sig) != 3:
raise ValueError("Invalid Signature format.")
protected = self._loads(sig[0]) if sig[0] else b""
unprotected = sig[1]
if not isinstance(unprotected, dict):
raise ValueError(
"unprotected header in signature structure should be dict."
)
kid = self._get_kid(protected, unprotected)
if kid:
for i, k in enumerate(keys):
if k.kid != kid:
continue
try:
to_be_signed = self._dumps(
[
"Signature",
data.value[0],
sig[0],
external_aad,
data.value[2],
]
)
k.verify(to_be_signed, sig[2])
return data.value[2]
except Exception as e:
err = e
continue
for i, k in enumerate(keys):
try:
to_be_signed = self._dumps(
[
"Signature",
data.value[0],
sig[0],
external_aad,
data.value[2],
]
)
k.verify(to_be_signed, sig[2])
return data.value[2]
except Exception as e:
err = e
raise err
def _filter_by_key_ops(
self, keys: List[COSEKeyInterface], op: int
) -> List[COSEKeyInterface]:
res: List[COSEKeyInterface] = []
for k in keys:
if op in k.key_ops:
res.append(k)
if len(res) == 0:
res = keys
return res
def _get_alg(self, protected: Any) -> int:
return protected[1] if isinstance(protected, dict) and 1 in protected else 0
def _get_kid(self, protected: Any, unprotected: dict) -> bytes:
kid = b""
if isinstance(protected, dict) and 4 in protected:
kid = protected[4]
elif 4 in unprotected:
kid = unprotected[4]
elif self._verify_kid:
raise ValueError("kid should be specified.")
return kid