Source code for cwt.recipient

import json
from typing import Any, Dict, List, Optional, Union

import cbor2

from .const import (  # COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_WITH_KEY_WRAP,
    COSE_ALGORITHMS_CKDM_KEY_AGREEMENT,
    COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_DIRECT,
    COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_WITH_KEY_WRAP,
    COSE_ALGORITHMS_KEY_WRAP,
    COSE_ALGORITHMS_RECIPIENT,
)
from .cose_key import COSEKey
from .cose_key_interface import COSEKeyInterface
from .recipient_algs.aes_key_wrap import AESKeyWrap
from .recipient_algs.direct_hkdf import DirectHKDF
from .recipient_algs.direct_key import DirectKey
from .recipient_algs.ecdh_aes_key_wrap import ECDH_AESKeyWrap
from .recipient_algs.ecdh_direct_hkdf import ECDH_DirectHKDF
from .recipient_interface import RecipientInterface
from .utils import parse_apu, parse_apv, to_cose_header


[docs]class Recipient: """ A :class:`RecipientInterface <cwt.RecipientInterface>` Builder. """
[docs] @classmethod def new( cls, protected: dict = {}, unprotected: dict = {}, ciphertext: bytes = b"", recipients: List[Any] = [], sender_key: Optional[COSEKeyInterface] = None, ) -> RecipientInterface: """ Creates a recipient from a CBOR-like dictionary with numeric keys. Args: protected (dict): Parameters that are to be cryptographically protected. unprotected (dict): Parameters that are not cryptographically protected. ciphertext (List[Any]): A cipher text. sender_key (Optional[COSEKeyInterface]): A sender key as COSEKey. Returns: RecipientInterface: A recipient object. Raises: ValueError: Invalid arguments. """ p = to_cose_header(protected, algs=COSE_ALGORITHMS_RECIPIENT) u = to_cose_header(unprotected, algs=COSE_ALGORITHMS_RECIPIENT) alg = u[1] if 1 in u else p.get(1, 0) if alg == 0: raise ValueError("alg should be specified.") if alg == -6: return DirectKey(u, ciphertext, recipients) if alg in [-10, -11]: return DirectHKDF(p, u, ciphertext, recipients) if alg in [-3, -4, -5]: if not sender_key: sender_key = COSEKey.from_symmetric_key(alg=alg) return AESKeyWrap(p, u, sender_key, ciphertext, recipients) if alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_DIRECT.values(): return ECDH_DirectHKDF(p, u, ciphertext, recipients, sender_key) if alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT_WITH_KEY_WRAP.values(): return ECDH_AESKeyWrap(p, u, ciphertext, recipients, sender_key) raise ValueError(f"Unsupported or unknown alg(1): {alg}.")
[docs] @classmethod def from_jwk(cls, data: Union[str, bytes, Dict[str, Any]]) -> RecipientInterface: """ Creates a recipient from JWK-like data. Args: data (Union[str, bytes, Dict[str, Any]]): JSON-formatted recipient data. Returns: RecipientInterface: A recipient object. Raises: ValueError: Invalid arguments. DecodeError: Failed to decode the key data. """ protected: Dict[int, Any] = {} unprotected: Dict[int, Any] = {} recipient: Dict[str, Any] if not isinstance(data, dict): recipient = json.loads(data) else: recipient = data # alg sender_key = None if "alg" not in recipient: raise ValueError("alg should be specified.") if not isinstance(recipient["alg"], str): raise ValueError("alg should be str.") if recipient["alg"] not in COSE_ALGORITHMS_RECIPIENT: raise ValueError(f"Unsupported or unknown alg: {recipient['alg']}.") if recipient["alg"] == "direct": unprotected[1] = COSE_ALGORITHMS_RECIPIENT[recipient["alg"]] elif recipient["alg"] in COSE_ALGORITHMS_KEY_WRAP: unprotected[1] = COSE_ALGORITHMS_RECIPIENT[recipient["alg"]] sender_key = COSEKey.from_jwk(recipient) else: protected[1] = COSE_ALGORITHMS_RECIPIENT[recipient["alg"]] if recipient["alg"] in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT.keys(): sender_key = COSEKey.from_jwk(recipient) # kid if "kid" in recipient: if not isinstance(recipient["kid"], (str, bytes)): raise ValueError("kid should be str or bytes.") if isinstance(recipient["kid"], str): unprotected[4] = recipient["kid"].encode("utf-8") else: unprotected[4] = recipient["kid"] # salt if "salt" in recipient: if not isinstance(recipient["salt"], str): raise ValueError("salt should be str.") unprotected[-20] = recipient["salt"].encode("utf-8") # context if "context" in recipient: if not isinstance(recipient["context"], dict): raise ValueError("context should be dict.") apu = parse_apu(recipient["context"]) if apu[0]: unprotected[-21] = apu[0] if apu[1]: unprotected[-22] = apu[1] if apu[2]: unprotected[-23] = apu[2] apv = parse_apv(recipient["context"]) if apv[0]: unprotected[-24] = apv[0] if apv[1]: unprotected[-25] = apv[1] if apv[2]: unprotected[-26] = apv[2] return cls.new(protected, unprotected, sender_key=sender_key)
[docs] @classmethod def from_list(cls, recipient: List[Any]) -> RecipientInterface: """ Creates a recipient from a raw COSE array data. Args: data (Union[str, bytes, Dict[str, Any]]): JSON-formatted recipient data. Returns: RecipientInterface: A recipient object. Raises: ValueError: Invalid arguments. DecodeError: Failed to decode the key data. """ if not isinstance(recipient, list) or ( len(recipient) != 3 and len(recipient) != 4 ): raise ValueError("Invalid recipient format.") if not isinstance(recipient[0], bytes): raise ValueError("protected header should be bytes.") protected = {} if not recipient[0] else cbor2.loads(recipient[0]) if not isinstance(recipient[1], dict): raise ValueError("unprotected header should be dict.") if not isinstance(recipient[2], bytes): raise ValueError("ciphertext should be bytes.") if len(recipient) == 3: return Recipient.new(protected, recipient[1], recipient[2]) if not isinstance(recipient[3], list): raise ValueError("recipients should be list.") recipients: List[RecipientInterface] = [] for r in recipient[3]: recipients.append(cls.from_list(r)) return cls.new(protected, recipient[1], recipient[2], recipients)