import json
from typing import Any, Dict, List, Optional, Union
import cbor2
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey,
EllipticCurvePublicKey,
)
from cryptography.hazmat.primitives.asymmetric.ed448 import (
Ed448PrivateKey,
Ed448PublicKey,
)
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from cryptography.hazmat.primitives.asymmetric.x448 import X448PrivateKey, X448PublicKey
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
load_pem_private_key,
load_pem_public_key,
)
from .algs.ec2 import EC2Key
from .algs.okp import OKPKey
from .algs.rsa import RSAKey
from .algs.symmetric import AESCCMKey, AESGCMKey, ChaCha20Key, HMACKey
from .const import (
COSE_ALGORITHMS_RSA,
COSE_ALGORITHMS_SYMMETRIC,
COSE_KEY_OPERATION_VALUES,
COSE_KEY_TYPES,
COSE_NAMED_ALGORITHMS_SUPPORTED,
JWK_ELLIPTIC_CURVES,
JWK_OPERATIONS,
JWK_PARAMS_EC,
JWK_PARAMS_OKP,
JWK_PARAMS_RSA,
JWK_TYPES,
)
from .cose_key_interface import COSEKeyInterface
from .utils import base64url_decode, uint_to_bytes
[docs]class COSEKey:
"""
A :class:`COSEKeyInterface <cwt.COSEKeyInterface>` Builder.
"""
[docs] @staticmethod
def from_dict(cose_key: Dict[int, Any]) -> COSEKeyInterface:
"""
Create a COSE key from a CBOR-like dictionary with numeric keys.
Args:
cose_key (Dict[int, Any]): A CBOR-like dictionary with numeric keys
of a COSE key.
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
"""
# Validate COSE Key common parameters.
if 1 not in cose_key:
raise ValueError("kty(1) not found.")
if not isinstance(cose_key[1], int) and not isinstance(cose_key[1], str):
raise ValueError("kty(1) should be int or str(tstr).")
if cose_key[1] == 1:
return OKPKey(cose_key)
if cose_key[1] == 2:
return EC2Key(cose_key)
if cose_key[1] == 3:
return RSAKey(cose_key)
if cose_key[1] == 4:
if 3 not in cose_key or (
not isinstance(cose_key[3], int) and not isinstance(cose_key[3], str)
):
raise ValueError("alg(3) should be int or str(tstr).")
if cose_key[3] in [1, 2, 3]:
return AESGCMKey(cose_key)
if cose_key[3] in [4, 5, 6, 7]:
return HMACKey(cose_key)
if cose_key[3] in [10, 11, 12, 13, 30, 31, 32, 33]:
return AESCCMKey(cose_key)
if cose_key[3] == 24:
return ChaCha20Key(cose_key)
raise ValueError(f"Unsupported or unknown alg(3): {cose_key[3]}.")
raise ValueError(f"Unsupported or unknown kty(1): {cose_key[1]}.")
[docs] @classmethod
def from_symmetric_key(
cls,
key: Union[bytes, str] = b"",
alg: Union[int, str] = "HMAC 256/256",
kid: Union[bytes, str] = b"",
key_ops: Optional[Union[List[int], List[str]]] = None,
) -> COSEKeyInterface:
"""
Create a COSE key from a symmetric key.
Args:
key (Union[bytes, str]): A key bytes or string.
alg (Union[int, str]): An algorithm label(int) or name(str). Supported ``alg`` are listed
in `Supported COSE Algorithms <https://python-cwt.readthedocs.io/en/stable/algorithms.html>`_.
kid (Union[bytes, str]): A key identifier.
key_ops (Union[List[int], List[str]]): A list of key operation values. Following values can be used:
``1("sign")``, ``2("verify")``, ``3("encrypt")``, ``4("decrypt")``, ``5("wrap key")``,
``6("unwrap key")``, ``7("derive key")``, ``8("derive bits")``,
``9("MAC create")``, ``10("MAC verify")``
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
"""
if isinstance(key, str):
key = key.encode("utf-8")
alg_id = alg if isinstance(alg, int) else COSE_ALGORITHMS_SYMMETRIC.get(alg, 0)
if alg_id == 0:
raise ValueError(f"Unsupported or unknown alg(3): {alg}.")
cose_key = {
1: 4, # kty: 'Symmetric'
3: alg_id, # alg: int
-1: key, # k: bstr
}
if isinstance(kid, str):
kid = kid.encode("utf-8")
if kid:
cose_key[2] = kid
key_ops_labels: List[int] = []
if key_ops and isinstance(key_ops, list):
try:
for ops in key_ops:
if isinstance(ops, str):
key_ops_labels.append(COSE_KEY_OPERATION_VALUES[ops])
else:
key_ops_labels.append(ops)
except Exception:
raise ValueError("Unsupported or unknown key_ops.")
cose_key[4] = key_ops_labels
return cls.from_dict(cose_key)
[docs] @classmethod
def from_bytes(cls, key_data: bytes) -> COSEKeyInterface:
"""
Create a COSE key from CBOR-formatted key data.
Args:
key_data (bytes): CBOR-formatted key data.
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode the key data.
"""
cose_key = cbor2.loads(key_data)
return cls.from_dict(cose_key)
[docs] @classmethod
def from_jwk(cls, data: Union[str, bytes, Dict[str, Any]]) -> COSEKeyInterface:
"""
Create a COSE key from JWK (JSON Web Key).
Args:
jwk (Union[str, bytes, Dict[str, Any]]): JWK-formatted key data.
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode the key data.
"""
cose_key: Dict[int, Any] = {}
# kty
jwk: Dict[str, Any]
if not isinstance(data, dict):
jwk = json.loads(data)
else:
jwk = data
if "kty" not in jwk:
raise ValueError("kty not found.")
if jwk["kty"] not in JWK_TYPES:
raise ValueError(f"Unknown kty: {jwk['kty']}.")
cose_key[1] = JWK_TYPES[jwk["kty"]]
# kid
if "kid" in jwk:
if not isinstance(jwk["kid"], str):
raise ValueError("kid should be str.")
cose_key[2] = jwk["kid"].encode("utf-8")
# alg
if "alg" in jwk:
if not isinstance(jwk["alg"], str):
raise ValueError("alg should be str.")
if jwk["alg"] not in COSE_NAMED_ALGORITHMS_SUPPORTED:
raise ValueError(f"Unsupported or unknown alg: {jwk['alg']}.")
cose_key[3] = COSE_NAMED_ALGORITHMS_SUPPORTED[jwk["alg"]]
# key operation dependent conversion
is_public = False
if cose_key[1] == 4: # Symmetric
if "k" not in jwk or not isinstance(jwk["k"], str):
raise ValueError("k is not found or invalid format.")
cose_key[-1] = base64url_decode(jwk["k"])
elif cose_key[1] == 3: # RSA
for k, v in jwk.items():
if k not in JWK_PARAMS_RSA:
continue
cose_key[JWK_PARAMS_RSA[k]] = base64url_decode(v)
if -3 not in cose_key:
is_public = True
else: # OKP/EC2
if "crv" not in jwk:
raise ValueError("crv not found.")
if jwk["crv"] not in JWK_ELLIPTIC_CURVES:
raise ValueError(f"Unknown crv: {jwk['crv']}.")
cose_key[-1] = JWK_ELLIPTIC_CURVES[jwk["crv"]]
if cose_key[1] == 1: # OKP
for k, v in jwk.items():
if k not in JWK_PARAMS_OKP:
continue
cose_key[JWK_PARAMS_OKP[k]] = base64url_decode(v)
else: # EC2
for k, v in jwk.items():
if k not in JWK_PARAMS_EC:
continue
cose_key[JWK_PARAMS_EC[k]] = base64url_decode(v)
if -4 not in cose_key:
is_public = True
# use/key_ops
use = 0
if "use" in jwk:
if jwk["use"] == "enc":
use = 4 if is_public else 3 # 3: encrypt, 4: decrypt
elif jwk["use"] == "sig":
if cose_key[1] == 4:
use = 10 # 10: MAC verify
else:
use = 2 if is_public else 1 # 1: sign, 2: verify
else:
raise ValueError(f"Unknown use: {jwk['use']}.")
if "key_ops" in jwk:
if not isinstance(jwk["key_ops"], list):
raise ValueError("key_ops should be list.")
cose_key[4] = []
try:
for ops in jwk["key_ops"]:
cose_key[4].append(JWK_OPERATIONS[ops])
except KeyError as err:
raise ValueError("Unsupported or unknown key_ops.") from err
if use != 0 and use not in cose_key[4]:
raise ValueError("use and key_ops are conflicted each other.")
else:
if use != 0:
cose_key[4] = []
cose_key[4].append(use)
return cls.from_dict(cose_key)
[docs] @classmethod
def from_pem(
cls,
key_data: Union[str, bytes],
alg: Union[int, str] = "",
kid: Union[bytes, str] = b"",
key_ops: Optional[Union[List[int], List[str]]] = None,
) -> COSEKeyInterface:
"""
Create a COSE key from PEM-formatted key data.
Args:
key_data (bytes): A PEM-formatted key data.
alg (Union[int, str]): An algorithm label(int) or name(str).
Different from ::func::`cwt.COSEKey.from_symmetric_key`, it is only used when an algorithm
cannot be specified by the PEM data, such as RSA family algorithms.
kid (Union[bytes, str]): A key identifier.
key_ops (Union[List[int], List[str]]): A list of key operation values. Following values can be used:
``1("sign")``, ``2("verify")``, ``3("encrypt")``, ``4("decrypt")``, ``5("wrap key")``,
``6("unwrap key")``, ``7("derive key")``, ``8("derive bits")``,
``9("MAC create")``, ``10("MAC verify")``
Returns:
COSEKeyInterface: A COSE key object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode the key data.
"""
if isinstance(key_data, str):
key_data = key_data.encode("utf-8")
key_str = key_data.decode("utf-8")
k: Any = None
if "BEGIN PUBLIC" in key_str:
k = load_pem_public_key(key_data)
elif "BEGIN PRIVATE" in key_str:
k = load_pem_private_key(key_data, password=None)
elif "BEGIN EC PRIVATE" in key_str:
k = load_pem_private_key(key_data, password=None)
else:
raise ValueError("Failed to decode PEM.")
cose_key: Dict[int, Any] = {}
if isinstance(kid, str):
kid = kid.encode("utf-8")
if kid:
cose_key[2] = kid
key_ops_labels: List[int] = []
if key_ops and isinstance(key_ops, list):
try:
for ops in key_ops:
if isinstance(ops, str):
key_ops_labels.append(COSE_KEY_OPERATION_VALUES[ops])
else:
key_ops_labels.append(ops)
except Exception:
raise ValueError("Unsupported or unknown key_ops.")
cose_key[4] = key_ops_labels
if isinstance(k, RSAPublicKey) or isinstance(k, RSAPrivateKey):
if not alg:
raise ValueError("alg parameter should be specified for an RSA key.")
if isinstance(alg, str):
if alg not in COSE_ALGORITHMS_RSA:
raise ValueError(f"Unsupported or unknow alg: {alg}.")
alg = COSE_ALGORITHMS_RSA[alg]
cose_key[1] = COSE_KEY_TYPES["RSA"]
cose_key[3] = alg
if isinstance(k, RSAPublicKey):
pub_nums = k.public_numbers()
cose_key[-1] = uint_to_bytes(pub_nums.n)
cose_key[-2] = uint_to_bytes(pub_nums.e)
else:
priv_nums = k.private_numbers()
cose_key[-1] = uint_to_bytes(priv_nums.public_numbers.n)
cose_key[-2] = uint_to_bytes(priv_nums.public_numbers.e)
cose_key[-3] = uint_to_bytes(priv_nums.d)
cose_key[-4] = uint_to_bytes(priv_nums.p)
cose_key[-5] = uint_to_bytes(priv_nums.q)
cose_key[-6] = uint_to_bytes(priv_nums.dmp1) # dP
cose_key[-7] = uint_to_bytes(priv_nums.dmq1) # dQ
cose_key[-8] = uint_to_bytes(priv_nums.iqmp) # qInv
elif isinstance(k, EllipticCurvePrivateKey) or isinstance(
k, EllipticCurvePublicKey
):
key_len: int = 32
cose_key[1] = COSE_KEY_TYPES["EC2"]
if k.curve.name == "secp256r1":
cose_key[-1] = 1
elif k.curve.name == "secp384r1":
cose_key[-1] = 2
key_len = 48
elif k.curve.name == "secp521r1":
cose_key[-1] = 3
key_len = 66
elif k.curve.name == "secp256k1":
cose_key[-1] = 8
else:
raise ValueError(f"Unsupported or unknown alg: {k.curve.name}.")
if isinstance(k, EllipticCurvePublicKey):
cose_key[-2] = k.public_numbers().x.to_bytes(key_len, byteorder="big")
cose_key[-3] = k.public_numbers().y.to_bytes(key_len, byteorder="big")
else:
cose_key[-2] = (
k.public_key().public_numbers().x.to_bytes(key_len, byteorder="big")
)
cose_key[-3] = (
k.public_key().public_numbers().y.to_bytes(key_len, byteorder="big")
)
cose_key[-4] = k.private_numbers().private_value.to_bytes(
key_len, byteorder="big"
)
elif isinstance(k, Ed25519PublicKey) or isinstance(k, Ed25519PrivateKey):
cose_key[1] = COSE_KEY_TYPES["OKP"]
cose_key[3] = -8 # EdDSA
cose_key[-1] = 6 # Ed25519
if isinstance(k, Ed25519PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
elif isinstance(k, Ed448PublicKey) or isinstance(k, Ed448PrivateKey):
cose_key[1] = COSE_KEY_TYPES["OKP"]
cose_key[3] = -8 # EdDSA
cose_key[-1] = 7 # Ed448
if isinstance(k, Ed448PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
elif isinstance(k, X25519PublicKey) or isinstance(k, X25519PrivateKey):
cose_key[1] = COSE_KEY_TYPES["OKP"]
cose_key[3] = -8 # EdDSA
cose_key[-1] = 4 # X25519
if isinstance(k, X25519PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
elif isinstance(k, X448PublicKey) or isinstance(k, X448PrivateKey):
cose_key[1] = COSE_KEY_TYPES["OKP"]
cose_key[3] = -8 # EdDSA
cose_key[-1] = 5 # X448
if isinstance(k, X448PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
else:
raise ValueError(f"Unsupported or unknown key: {type(k)}.")
return cls.from_dict(cose_key)