from typing import Any, Dict, List, Optional, Union
import cbor2
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey,
EllipticCurvePublicKey,
)
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key,
load_pem_public_key,
)
from .algs.ec2 import EC2Key
from .algs.okp import OKPKey
from .algs.raw import RawKey
from .algs.rsa import RSAKey
from .algs.symmetric import (
AESCBCKey,
AESCCMKey,
AESCTRKey,
AESGCMKey,
AESKeyWrap,
ChaCha20Key,
HMACKey,
)
from .const import (
COSE_ALGORITHMS_CKDM_KEY_AGREEMENT,
COSE_ALGORITHMS_RSA,
COSE_ALGORITHMS_SIG_EC2,
COSE_ALGORITHMS_SIG_OKP,
COSE_ALGORITHMS_SYMMETRIC,
COSE_KEY_OPERATION_VALUES,
COSE_KEY_TYPES,
)
from .cose_key_interface import COSEKeyInterface
from .enums import COSEKeyParams
from .utils import jwk_to_cose_key_params, uint_to_bytes
[docs]
class COSEKey:
"""
A :class:`COSEKeyInterface <cwt.COSEKeyInterface>` Builder.
"""
[docs]
@staticmethod
def new(params: Dict[int, Any]) -> COSEKeyInterface:
"""
Creates a COSE_Key object from a COSE_Key structure, which is a dictionary with numeric keys.
Args:
params (Dict[int, Any]): A dictionary with numeric keys of a COSE key.
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
"""
# Validate COSE Key common parameters.
if COSEKeyParams.KTY not in params:
raise ValueError("kty(1) not found.")
if not isinstance(params[COSEKeyParams.KTY], int) and not isinstance(params[COSEKeyParams.KTY], str):
raise ValueError("kty(1) should be int or str(tstr).")
if params[COSEKeyParams.KTY] == 1:
return OKPKey(params)
if params[COSEKeyParams.KTY] == 2:
return EC2Key(params)
if params[COSEKeyParams.KTY] == 3:
return RSAKey(params)
if params[COSEKeyParams.KTY] != 4:
raise ValueError(f"Unsupported or unknown kty(1): {params[1]}.")
if COSEKeyParams.ALG not in params:
raise ValueError("alg(3) not found.")
if not isinstance(params[COSEKeyParams.ALG], int) and not isinstance(params[COSEKeyParams.ALG], str):
raise ValueError("alg(3) should be int or str(tstr).")
if params[COSEKeyParams.ALG] in [1, 2, 3]:
return AESGCMKey(params)
if params[COSEKeyParams.ALG] in [4, 5, 6, 7]:
return HMACKey(params)
if params[COSEKeyParams.ALG] in [10, 11, 12, 13, 30, 31, 32, 33]:
return AESCCMKey(params)
if params[COSEKeyParams.ALG] == 24:
return ChaCha20Key(params)
if params[COSEKeyParams.ALG] in [-3, -4, -5]:
return AESKeyWrap(params)
if params[COSEKeyParams.ALG] in [-65534, -65533, -65532]:
return AESCTRKey(params)
if params[COSEKeyParams.ALG] in [-65531, -65530, -65529]:
return AESCBCKey(params)
raise ValueError(f"Unsupported or unknown alg(3): {params[3]}.")
[docs]
@classmethod
def generate_symmetric_key(
cls,
alg: Union[int, str] = "",
kid: Union[bytes, str] = b"",
key_ops: Optional[Union[List[int], List[str]]] = None,
) -> COSEKeyInterface:
"""
Generates a symmetric COSE key from from a randomly genarated byte string.
Args:
alg (Union[int, str]): An algorithm label(int) or name(str).
Supported ``alg`` are listed in
`Supported COSE Algorithms <https://python-cwt.readthedocs.io/en/stable/algorithms.html>`_.
kid (Union[bytes, str]): A key identifier.
key_ops (Union[List[int], List[str]]): A list of key operation values.
Following values can be used:
``1("sign")``, ``2("verify")``, ``3("encrypt")``, ``4("decrypt")``, ``5("wrap key")``,
``6("unwrap key")``, ``7("derive key")``, ``8("derive bits")``,
``9("MAC create")``, ``10("MAC verify")``
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
"""
return cls.from_symmetric_key(b"", alg, kid, key_ops)
[docs]
@classmethod
def from_symmetric_key(
cls,
key: Union[bytes, str] = b"",
alg: Union[int, str] = "",
kid: Union[bytes, str] = b"",
key_ops: Optional[Union[List[int], List[str]]] = None,
) -> COSEKeyInterface:
"""
Creates a COSE key from a symmetric key.
Args:
key (Union[bytes, str]): A key bytes or string.
alg (Union[int, str]): An algorithm label(int) or name(str).
Supported ``alg`` are listed in
`Supported COSE Algorithms <https://python-cwt.readthedocs.io/en/stable/algorithms.html>`_.
kid (Union[bytes, str]): A key identifier.
key_ops (Union[List[int], List[str]]): A list of key operation values.
Following values can be used:
``1("sign")``, ``2("verify")``, ``3("encrypt")``, ``4("decrypt")``, ``5("wrap key")``,
``6("unwrap key")``, ``7("derive key")``, ``8("derive bits")``,
``9("MAC create")``, ``10("MAC verify")``
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
"""
if isinstance(key, str):
key = key.encode("utf-8")
if alg == "":
return RawKey({1: 4, -1: key})
alg_id = alg if isinstance(alg, int) else COSE_ALGORITHMS_SYMMETRIC.get(alg, 0)
if alg_id == 0:
raise ValueError(f"Unsupported or unknown alg(3): {alg}.")
params: Dict[int, Any] = {
COSEKeyParams.KTY: 4, # kty: 'Symmetric'
COSEKeyParams.ALG: alg_id, # alg: int
COSEKeyParams.K: key, # k: bstr
}
if isinstance(kid, str):
kid = kid.encode("utf-8")
if kid:
params[2] = kid
key_ops_labels: List[int] = []
if key_ops and isinstance(key_ops, list):
try:
for ops in key_ops:
if isinstance(ops, str):
key_ops_labels.append(COSE_KEY_OPERATION_VALUES[ops])
else:
key_ops_labels.append(ops)
except Exception:
raise ValueError("Unsupported or unknown key_ops.")
params[4] = key_ops_labels
return cls.new(params)
[docs]
@classmethod
def from_bytes(cls, key_data: bytes) -> COSEKeyInterface:
"""
Creates a COSE key from CBOR-formatted key data.
Args:
key_data (bytes): CBOR-formatted key data.
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode the key data.
"""
params = cbor2.loads(key_data)
return cls.new(params)
[docs]
@classmethod
def from_jwk(cls, data: Union[str, bytes, Dict[str, Any]]) -> COSEKeyInterface:
"""
Creates a COSE key from JWK (JSON Web Key).
Args:
jwk (Union[str, bytes, Dict[str, Any]]): JWK-formatted key data.
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode the key data.
"""
return cls.new(jwk_to_cose_key_params(data))
[docs]
@classmethod
def from_pem(
cls,
key_data: Union[str, bytes],
alg: Union[int, str] = "",
kid: Union[bytes, str] = b"",
key_ops: Optional[Union[List[int], List[str]]] = None,
) -> COSEKeyInterface:
"""
Creates a COSE key from PEM-formatted key data.
Args:
key_data (bytes): A PEM-formatted key data.
alg (Union[int, str]): An algorithm label(int) or name(str).
Different from ::func::`cwt.COSEKey.from_symmetric_key`, it is only used when an algorithm
cannot be specified by the PEM data, such as RSA family algorithms.
kid (Union[bytes, str]): A key identifier.
key_ops (Union[List[int], List[str]]): A list of key operation values. Following values can be used:
``1("sign")``, ``2("verify")``, ``3("encrypt")``, ``4("decrypt")``, ``5("wrap key")``,
``6("unwrap key")``, ``7("derive key")``, ``8("derive bits")``,
``9("MAC create")``, ``10("MAC verify")``
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode the key data.
"""
if isinstance(key_data, str):
key_data = key_data.encode("utf-8")
key_str = key_data.decode("utf-8")
k: Any = None
if "BEGIN PUBLIC" in key_str:
k = load_pem_public_key(key_data)
elif "BEGIN CERTIFICATE" in key_str:
k = x509.load_pem_x509_certificate(key_data).public_key()
elif "BEGIN PRIVATE" in key_str:
k = load_pem_private_key(key_data, password=None)
elif "BEGIN EC PRIVATE" in key_str:
k = load_pem_private_key(key_data, password=None)
else:
raise ValueError("Failed to decode PEM.")
params: Dict[int, Any] = {}
if isinstance(kid, str):
kid = kid.encode("utf-8")
if kid:
params[2] = kid
key_ops_labels: List[int] = []
if key_ops and isinstance(key_ops, list):
try:
for ops in key_ops:
if isinstance(ops, str):
key_ops_labels.append(COSE_KEY_OPERATION_VALUES[ops])
else:
key_ops_labels.append(ops)
except Exception:
raise ValueError("Unsupported or unknown key_ops.")
params[4] = key_ops_labels
if isinstance(k, RSAPublicKey) or isinstance(k, RSAPrivateKey):
if not alg:
raise ValueError("alg parameter should be specified for an RSA key.")
if isinstance(alg, str):
if alg not in COSE_ALGORITHMS_RSA:
raise ValueError(f"Unsupported or unknown alg: {alg}.")
alg = COSE_ALGORITHMS_RSA[alg]
params[1] = COSE_KEY_TYPES["RSA"]
params[3] = alg
if isinstance(k, RSAPublicKey):
pub_nums = k.public_numbers()
params[-1] = uint_to_bytes(pub_nums.n)
params[-2] = uint_to_bytes(pub_nums.e)
else:
priv_nums = k.private_numbers()
params[-1] = uint_to_bytes(priv_nums.public_numbers.n)
params[-2] = uint_to_bytes(priv_nums.public_numbers.e)
params[-3] = uint_to_bytes(priv_nums.d)
params[-4] = uint_to_bytes(priv_nums.p)
params[-5] = uint_to_bytes(priv_nums.q)
params[-6] = uint_to_bytes(priv_nums.dmp1) # dP
params[-7] = uint_to_bytes(priv_nums.dmq1) # dQ
params[-8] = uint_to_bytes(priv_nums.iqmp) # qInv
elif isinstance(k, EllipticCurvePrivateKey) or isinstance(k, EllipticCurvePublicKey):
if alg:
if isinstance(alg, str):
if alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT:
alg = COSE_ALGORITHMS_CKDM_KEY_AGREEMENT[alg]
elif alg in COSE_ALGORITHMS_SIG_EC2:
alg = COSE_ALGORITHMS_SIG_EC2[alg]
else:
raise ValueError(f"Unsupported or unknown alg for EC2: {alg}.")
params[3] = alg
params.update(EC2Key.to_cose_key(k))
else:
if alg:
if isinstance(alg, str):
if alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT:
alg = COSE_ALGORITHMS_CKDM_KEY_AGREEMENT[alg]
elif alg in COSE_ALGORITHMS_SIG_OKP:
alg = COSE_ALGORITHMS_SIG_OKP[alg]
else:
raise ValueError(f"Unsupported or unknown alg for OKP: {alg}.")
params[3] = alg
params.update(OKPKey.to_cose_key(k))
return cls.new(params)