Source code for cwt.recipient

from typing import Any, Dict, List, Optional, Union

from .cbor_processor import CBORProcessor
from .cose_key import COSEKey


[docs]class Recipient(CBORProcessor): """ A COSE Recipient. """ def __init__( self, protected: Union[bytes, Dict[int, Any]] = {}, unprotected: Dict[int, Any] = {}, ciphertext: bytes = b"", recipients: List[Any] = [], ): # Validate unprotected if 1 in unprotected: alg = unprotected[1] if alg == -6: # direct if len(protected) != 0: raise ValueError("protected header should be empty.") if len(ciphertext) != 0: raise ValueError("ciphertext should be zero-length bytes.") if len(recipients) != 0: raise ValueError("recipients should be absent.") if protected == b"": self._protected = {} elif isinstance(protected, bytes): self._protected = self._loads(protected) else: self._protected = protected self._unprotected = unprotected self._ciphertext = ciphertext # Validate recipients self._recipients: List[Recipient] = [] if not recipients: return for recipient in recipients: if not isinstance(recipient, Recipient): raise ValueError("Invalid child recipient.") self._recipients.append(recipient) return @property def protected(self) -> Dict[int, Any]: return self._protected @property def unprotected(self) -> Dict[int, Any]: return self._unprotected @property def alg(self) -> int: return self._unprotected.get(1, 0) @property def kid(self) -> bytes: return self._unprotected.get(4, b"") @property def ciphertext(self) -> bytes: return self._ciphertext @property def recipients(self) -> Union[List[Any], None]: return self._recipients
[docs] def to_list(self) -> List[Any]: b_protected = self._dumps(self._protected) if self._protected else b"" b_ciphertext = self._ciphertext if self._ciphertext else b"" res: List[Any] = [b_protected, self._unprotected, b_ciphertext] if not self._recipients: return res children = [] for recipient in self._recipients: children.append(recipient.to_list()) res.append(children) return res
class Recipients: """ A Set of COSE Recipients. """ def __init__(self, recipients: List[Recipient]): self._recipients = recipients return def derive_key(self, keys: List[COSEKey]) -> COSEKey: """ Derive an appropriate key from recipients or keys privided as a parameter ``keys``. """ for recipient in self._recipients: if recipient.alg == -6: for k in keys: if k.kid == recipient.kid: return k raise ValueError("Failed to derive a key.") class RecipientsBuilder: """ A Recipients Builder. """ def __init__(self, options: Optional[Dict[str, Any]] = None): self._options = options return def from_list(self, recipients: List[Any]) -> Recipients: """ Create Recipient from a CBOR-like list. """ res: List[Recipient] = [] for r in recipients: res.append(self._create_recipient(r)) return Recipients(res) def _create_recipient(self, recipient: List[Any]) -> Recipient: if not isinstance(recipient, list) or ( len(recipient) != 3 and len(recipient) != 4 ): raise ValueError("Invalid recipient format.") if not isinstance(recipient[0], bytes): raise ValueError("protected header should be bytes.") if not isinstance(recipient[1], dict): raise ValueError("unprotected header should be dict.") if not isinstance(recipient[2], bytes): raise ValueError("ciphertext should be bytes.") if len(recipient) == 3: return Recipient(recipient[0], recipient[1], recipient[2]) if not isinstance(recipient[3], list): raise ValueError("recipients should be list.") recipients: List[Recipient] = [] for r in recipient[3]: recipients.append(self._create_recipient(r)) return Recipient(recipient[0], recipient[1], recipient[2], recipients)