from typing import Any, Dict, List, Optional, Tuple, Union
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.x448 import X448PublicKey
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey
from .algs.ec2 import EC2Key
from .algs.okp import OKPKey
from .cbor_processor import CBORProcessor
from .cose_key_interface import COSEKeyInterface
[docs]
class RecipientInterface(CBORProcessor):
"""
The interface class for a COSE Recipient.
"""
[docs]
def __init__(
self,
protected: Optional[Dict[int, Any]] = None,
unprotected: Optional[Dict[int, Any]] = None,
ciphertext: bytes = b"",
recipients: List[Any] = [],
key_ops: List[int] = [],
key: bytes = b"",
):
"""
Constructor.
Args:
protected (Optional[Dict[int, Any]]): Parameters that are to be cryptographically
protected.
unprotected (Optional[Dict[int, Any]]): Parameters that are not cryptographically
protected.
ciphertext: A ciphertext encoded as bytes.
recipients: A list of recipient information structures.
key_ops: A list of operations that the key is to be used for.
key: A body of the key as bytes.
"""
protected = {} if protected is None else protected
unprotected = {} if unprotected is None else unprotected
self._alg = 0
# kid
self._kid = b""
if 4 in protected:
if not isinstance(protected[4], bytes):
raise ValueError("protected[4](kid) should be bytes.")
self._kid = protected[4]
elif 4 in unprotected:
if not isinstance(unprotected[4], bytes):
raise ValueError("unprotected[4](kid) should be bytes.")
self._kid = unprotected[4]
# alg
if 1 in protected:
if not isinstance(protected[1], int):
raise ValueError("protected[1](alg) should be int.")
self._alg = protected[1]
elif 1 in unprotected:
if not isinstance(unprotected[1], int):
raise ValueError("unprotected[1](alg) should be int.")
self._alg = unprotected[1]
if unprotected[1] == -6: # direct
if len(protected) != 0:
raise ValueError("protected header should be empty.")
if len(ciphertext) != 0:
raise ValueError("ciphertext should be zero-length bytes.")
if len(recipients) != 0:
raise ValueError("recipients should be absent.")
# iv
if 5 in unprotected:
if not isinstance(unprotected[5], bytes):
raise ValueError("unprotected[5](iv) should be bytes.")
self._b_protected: Optional[bytes] = None
self._protected = protected
self._unprotected = unprotected
self._ciphertext = ciphertext
self._key = key
self._context: List[Any] = [0, [None, None, None], [None, None, None], [None, None]]
# Validate recipients
self._recipients: List[RecipientInterface] = []
if not recipients:
return
for recipient in recipients:
if not isinstance(recipient, RecipientInterface):
raise ValueError("Invalid child recipient.")
self._recipients.append(recipient)
return
@property
def kid(self) -> bytes:
"""
The key identifier.
"""
return self._kid
@property
def alg(self) -> int:
"""
The algorithm that is used with the key.
"""
return self._alg
@property
def protected(self) -> Dict[int, Any]:
"""
The parameters that are to be cryptographically protected.
"""
return self._protected
@property
def b_protected(self) -> bytes:
"""
The binary encoded protected header.
"""
if self._b_protected is None:
return self._dumps(self._protected)
return self._b_protected
@property
def unprotected(self) -> Dict[int, Any]:
"""
The parameters that are not cryptographically protected.
"""
return self._unprotected
@property
def ciphertext(self) -> bytes:
"""
The ciphertext encoded as bytes
"""
return self._ciphertext
@property
def recipients(self) -> List[Any]:
"""
The list of recipient information structures.
"""
return self._recipients
@property
def context(self) -> List[Any]:
"""
The recipient context information.
"""
return self._context
[docs]
def to_list(self) -> List[Any]:
"""
Returns the recipient information as a COSE recipient structure.
Returns:
List[Any]: The recipient structure.
"""
b_protected = self._dumps(self._protected) if self._protected else b""
b_ciphertext = self._ciphertext if self._ciphertext else b""
res: List[Any] = [b_protected, self._unprotected, b_ciphertext]
if not self._recipients:
return res
children = []
for recipient in self._recipients:
children.append(recipient.to_list())
res.append(children)
return res
[docs]
def encode(
self,
plaintext: bytes = b"",
aad: bytes = b"",
) -> Tuple[List[Any], Optional[COSEKeyInterface]]:
"""
Encrypts a specified plaintext to the ciphertext in the COSE_Recipient
structure with the recipient-specific method (e.g., key wrapping, key
agreement, or the combination of them) and sets up the related information
(context information or ciphertext) in the recipient structure.
This function will be called in COSE.encode_* functions so applications
do not need to call it directly.
Args:
plaintext (bytes): A plaing text to be encrypted. In most of the cases,
the plaintext is a byte string of a content encryption key.
external_aad (bytes): External additional authenticated data for AEAD.
aad_context (bytes): An additional authenticated data context to build
an Enc_structure internally.
Returns:
Tuple[List[Any], Optional[COSEKeyInterface]]: The encoded COSE_Recipient structure
and a derived key.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode(e.g., wrap, derive) the key.
"""
raise NotImplementedError
[docs]
def decode(
self,
key: COSEKeyInterface,
aad: bytes = b"",
alg: int = 0,
as_cose_key: bool = False,
) -> Union[bytes, COSEKeyInterface]:
"""
Decrypts the ciphertext in the COSE_Recipient structure with the
recipient-specific method (e.g., key wrapping, key agreement,
or the combination of them).
This function will be called in COSE.decode so applications do not need
to call it directly.
Args:
key (COSEKeyInterface): The external key to be used for
decrypting the ciphertext in the COSE_Recipient structure.
external_aad (bytes): External additional authenticated data for AEAD.
aad_context (bytes): An additional authenticated data context to build
an Enc_structure internally.
alg (int): The algorithm of the key derived.
as_cose_key (bool): The indicator whether the output will be returned
as a COSEKey or not.
Returns:
Union[bytes, COSEKeyInterface]: The decrypted ciphertext field or The COSEKey
converted from the decrypted ciphertext.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode(e.g., unwrap, derive) the key.
"""
raise NotImplementedError
def _to_cose_key(self, k: Union[EllipticCurvePublicKey, X25519PublicKey, X448PublicKey]) -> Dict[int, Any]:
if isinstance(k, EllipticCurvePublicKey):
return EC2Key.to_cose_key(k)
return OKPKey.to_cose_key(k)
def _set_b_protected(self, b_protected: bytes):
self._b_protected = b_protected
return