Compare commits

...

18 Commits

Author SHA1 Message Date
David Robertson
bfa3ae3d3b Print errors in test for confidence 2022-07-05 19:42:33 +01:00
David Robertson
44ac98422c lint fixup 2022-05-22 18:33:21 +01:00
David Robertson
245d5d66d3 ClientSecretJWTKey (needs testing) 2022-05-22 18:32:02 +01:00
David Robertson
26cb343d40 SSOAttributeRequirement 2022-05-22 18:00:10 +01:00
David Robertson
9ee0dab921 A few examples 2022-05-22 16:56:41 +01:00
David Robertson
be593787de More validation relative to discovery 2022-05-22 16:51:37 +01:00
David Robertson
fd92608b32 More generic example config 2022-05-22 16:51:17 +01:00
David Robertson
4c66f55279 A batch of linting 2022-05-22 16:26:02 +01:00
David Robertson
e2d9072543 Use pydantic plugin first
Some hooks are completely overrideen by mypy_zope, without the chance to
yield to other plugins
2022-05-22 16:15:54 +01:00
David Robertson
88f603f845 ensure idp_id gets a prefix 2022-05-21 21:42:08 +01:00
David Robertson
bbaba3c27f endpoints are required if discovery is enabled 2022-05-21 21:41:38 +01:00
David Robertson
3ba21b61a0 validate that idp_icon is an mxc_url 2022-05-21 18:21:45 +01:00
David Robertson
3097172832 legacy fallbacks seem to just work (TM)? 2022-05-21 17:58:13 +01:00
David Robertson
2179c6376a Extra fields and tests
Pleasantly: no pain here
2022-05-20 00:56:02 +01:00
David Robertson
51d9e34412 move TYPE_CHECKING workaround outside 2022-05-19 11:29:49 +01:00
David Robertson
9b9b51be6a It seems what I want is constr
but this interacts poorly with mypy :(
2022-05-19 10:29:27 +01:00
David Robertson
348b53fe9c WIP trying out validators 2022-05-19 10:29:27 +01:00
David Robertson
8f1b555ec6 Require and lock pydantic 2022-05-18 21:29:50 +01:00
5 changed files with 716 additions and 2 deletions

View File

@@ -1,6 +1,6 @@
[mypy]
namespace_packages = True
plugins = mypy_zope:plugin, scripts-dev/mypy_synapse_plugin.py
plugins = pydantic.mypy, mypy_zope:plugin, scripts-dev/mypy_synapse_plugin.py
follow_imports = normal
check_untyped_defs = True
show_error_codes = True
@@ -86,6 +86,12 @@ exclude = (?x)
|tests/utils.py
)$
[pydantic-mypy]
init_forbid_extra = True
init_typed = True
warn_required_dynamic_aliases = True
warn_untyped_fields = True
[mypy-synapse._scripts.*]
disallow_untyped_defs = True

54
poetry.lock generated
View File

@@ -778,6 +778,21 @@ category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[[package]]
name = "pydantic"
version = "1.9.0"
description = "Data validation and settings management using python 3.6 type hinting"
category = "main"
optional = false
python-versions = ">=3.6.1"
[package.dependencies]
typing-extensions = ">=3.7.4.3"
[package.extras]
dotenv = ["python-dotenv (>=0.10.4)"]
email = ["email-validator (>=1.0.3)"]
[[package]]
name = "pyflakes"
version = "2.4.0"
@@ -1563,7 +1578,7 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
content-hash = "d39d5ac5d51c014581186b7691999b861058b569084c525523baf70b77f292b1"
content-hash = "54ec27d5187386653b8d0d13ed843f86ae68b3ebbee633c82dfffc7605b99f74"
[metadata.files]
attrs = [
@@ -2251,6 +2266,43 @@ pycparser = [
{file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"},
{file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"},
]
pydantic = [
{file = "pydantic-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cb23bcc093697cdea2708baae4f9ba0e972960a835af22560f6ae4e7e47d33f5"},
{file = "pydantic-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1d5278bd9f0eee04a44c712982343103bba63507480bfd2fc2790fa70cd64cf4"},
{file = "pydantic-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ab624700dc145aa809e6f3ec93fb8e7d0f99d9023b713f6a953637429b437d37"},
{file = "pydantic-1.9.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c8d7da6f1c1049eefb718d43d99ad73100c958a5367d30b9321b092771e96c25"},
{file = "pydantic-1.9.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:3c3b035103bd4e2e4a28da9da7ef2fa47b00ee4a9cf4f1a735214c1bcd05e0f6"},
{file = "pydantic-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:3011b975c973819883842c5ab925a4e4298dffccf7782c55ec3580ed17dc464c"},
{file = "pydantic-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:086254884d10d3ba16da0588604ffdc5aab3f7f09557b998373e885c690dd398"},
{file = "pydantic-1.9.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:0fe476769acaa7fcddd17cadd172b156b53546ec3614a4d880e5d29ea5fbce65"},
{file = "pydantic-1.9.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8e9dcf1ac499679aceedac7e7ca6d8641f0193c591a2d090282aaf8e9445a46"},
{file = "pydantic-1.9.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d1e4c28f30e767fd07f2ddc6f74f41f034d1dd6bc526cd59e63a82fe8bb9ef4c"},
{file = "pydantic-1.9.0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:c86229333cabaaa8c51cf971496f10318c4734cf7b641f08af0a6fbf17ca3054"},
{file = "pydantic-1.9.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:c0727bda6e38144d464daec31dff936a82917f431d9c39c39c60a26567eae3ed"},
{file = "pydantic-1.9.0-cp36-cp36m-win_amd64.whl", hash = "sha256:dee5ef83a76ac31ab0c78c10bd7d5437bfdb6358c95b91f1ba7ff7b76f9996a1"},
{file = "pydantic-1.9.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:d9c9bdb3af48e242838f9f6e6127de9be7063aad17b32215ccc36a09c5cf1070"},
{file = "pydantic-1.9.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ee7e3209db1e468341ef41fe263eb655f67f5c5a76c924044314e139a1103a2"},
{file = "pydantic-1.9.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0b6037175234850ffd094ca77bf60fb54b08b5b22bc85865331dd3bda7a02fa1"},
{file = "pydantic-1.9.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:b2571db88c636d862b35090ccf92bf24004393f85c8870a37f42d9f23d13e032"},
{file = "pydantic-1.9.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:8b5ac0f1c83d31b324e57a273da59197c83d1bb18171e512908fe5dc7278a1d6"},
{file = "pydantic-1.9.0-cp37-cp37m-win_amd64.whl", hash = "sha256:bbbc94d0c94dd80b3340fc4f04fd4d701f4b038ebad72c39693c794fd3bc2d9d"},
{file = "pydantic-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e0896200b6a40197405af18828da49f067c2fa1f821491bc8f5bde241ef3f7d7"},
{file = "pydantic-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7bdfdadb5994b44bd5579cfa7c9b0e1b0e540c952d56f627eb227851cda9db77"},
{file = "pydantic-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:574936363cd4b9eed8acdd6b80d0143162f2eb654d96cb3a8ee91d3e64bf4cf9"},
{file = "pydantic-1.9.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c556695b699f648c58373b542534308922c46a1cda06ea47bc9ca45ef5b39ae6"},
{file = "pydantic-1.9.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:f947352c3434e8b937e3aa8f96f47bdfe6d92779e44bb3f41e4c213ba6a32145"},
{file = "pydantic-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5e48ef4a8b8c066c4a31409d91d7ca372a774d0212da2787c0d32f8045b1e034"},
{file = "pydantic-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:96f240bce182ca7fe045c76bcebfa0b0534a1bf402ed05914a6f1dadff91877f"},
{file = "pydantic-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:815ddebb2792efd4bba5488bc8fde09c29e8ca3227d27cf1c6990fc830fd292b"},
{file = "pydantic-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6c5b77947b9e85a54848343928b597b4f74fc364b70926b3c4441ff52620640c"},
{file = "pydantic-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c68c3bc88dbda2a6805e9a142ce84782d3930f8fdd9655430d8576315ad97ce"},
{file = "pydantic-1.9.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5a79330f8571faf71bf93667d3ee054609816f10a259a109a0738dac983b23c3"},
{file = "pydantic-1.9.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:f5a64b64ddf4c99fe201ac2724daada8595ada0d102ab96d019c1555c2d6441d"},
{file = "pydantic-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:a733965f1a2b4090a5238d40d983dcd78f3ecea221c7af1497b845a9709c1721"},
{file = "pydantic-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:2cc6a4cb8a118ffec2ca5fcb47afbacb4f16d0ab8b7350ddea5e8ef7bcc53a16"},
{file = "pydantic-1.9.0-py3-none-any.whl", hash = "sha256:085ca1de245782e9b46cefcf99deecc67d418737a1fd3f6a4f511344b613a5b3"},
{file = "pydantic-1.9.0.tar.gz", hash = "sha256:742645059757a56ecd886faf4ed2441b9c0cd406079c2b4bee51bcc3fbcd510a"},
]
pyflakes = [
{file = "pyflakes-2.4.0-py2.py3-none-any.whl", hash = "sha256:3bb3a3f256f4b7968c9c788781e4ff07dce46bdf12339dcda61053375426ee2e"},
{file = "pyflakes-2.4.0.tar.gz", hash = "sha256:05a85c2872edf37a4ed30b0cce2f6093e1d0581f8c19d7393122da7e25b2b24c"},

View File

@@ -182,6 +182,7 @@ hiredis = { version = "*", optional = true }
Pympler = { version = "*", optional = true }
parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
pydantic = ">=1.9.0"
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified

240
synapse/config/oidc2.py Normal file
View File

@@ -0,0 +1,240 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Mapping, Optional, Tuple
from pydantic import BaseModel, StrictBool, StrictStr, constr, validator
from pydantic.fields import ModelField
from synapse.util.stringutils import parse_and_validate_mxc_uri
# Ugly workaround for https://github.com/samuelcolvin/pydantic/issues/156. Mypy doesn't
# consider expressions like `constr(...)` to be valid types.
if TYPE_CHECKING:
IDP_ID_TYPE = str
IDP_BRAND_TYPE = str
else:
IDP_ID_TYPE = constr(
strict=True,
min_length=1,
max_length=250,
regex="^[A-Za-z0-9._~-]+$", # noqa: F722
)
IDP_BRAND_TYPE = constr(
strict=True,
min_length=1,
max_length=255,
regex="^[a-z][a-z0-9_.-]*$", # noqa: F722
)
# the following list of enum members is the same as the keys of
# authlib.oauth2.auth.ClientAuth.DEFAULT_AUTH_METHODS. We inline it
# to avoid importing authlib here.
class ClientAuthMethods(str, Enum):
# The duplication is unfortunate. 3.11 should have StrEnum though,
# and there is a backport available for 3.8.6.
client_secret_basic = "client_secret_basic"
client_secret_post = "client_secret_post"
none = "none"
class UserProfileMethod(str, Enum):
# The duplication is unfortunate. 3.11 should have StrEnum though,
# and there is a backport available for 3.8.6.
auto = "auto"
userinfo_endpoint = "userinfo_endpoint"
class SSOAttributeRequirement(BaseModel):
class Config:
# Complain if someone provides a field that's not one of those listed here.
# Pydantic suggests making your own BaseModel subclass if you want to do this,
# see https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
extra = "forbid"
attribute: StrictStr
# Note: a comment in config/oidc.py suggests that `value` may be optional. But
# The JSON schema seems to forbid this.
value: StrictStr
class ClientSecretJWTKey(BaseModel):
class Config:
extra = "forbid"
# a pem-encoded signing key
# TODO: how should we handle key_file?
key: StrictStr
# properties to include in the JWT header
# TODO: validator should enforce that jwt_header contains an 'alg'.
jwt_header: Mapping[str, str]
# properties to include in the JWT payload.
jwt_payload: Mapping[str, str] = {}
class OIDCProviderModel(BaseModel):
"""
Notes on Pydantic:
- I've used StrictStr because a plain `str` e.g. accepts integers and calls str()
on them
- pulling out constr() into IDP_ID_TYPE is a little awkward, but necessary to keep
mypy happy
-
"""
# a unique identifier for this identity provider. Used in the 'user_external_ids'
# table, as well as the query/path parameter used in the login protocol.
idp_id: IDP_ID_TYPE
@validator("idp_id")
def ensure_idp_id_prefix(cls, idp_id: str) -> str:
"""Prefix the given IDP with a prefix specific to the SSO mechanism, to avoid
clashes with other mechs (such as SAML, CAS).
We allow "oidc" as an exception so that people migrating from old-style
"oidc_config" format (which has long used "oidc" as its idp_id) can migrate to
a new-style "oidc_providers" entry without changing the idp_id for their provider
(and thereby invalidating their user_external_ids data).
"""
if idp_id != "oidc":
return "oidc-" + idp_id
return idp_id
# user-facing name for this identity provider.
idp_name: StrictStr
# Optional MXC URI for icon for this IdP.
idp_icon: Optional[StrictStr]
@validator("idp_icon")
def idp_icon_is_an_mxc_url(cls, idp_icon: str) -> str:
parse_and_validate_mxc_uri(idp_icon)
return idp_icon
# Optional brand identifier for this IdP.
idp_brand: Optional[StrictStr]
# whether the OIDC discovery mechanism is used to discover endpoints
discover: StrictBool = True
# the OIDC issuer. Used to validate tokens and (if discovery is enabled) to
# discover the provider's endpoints.
issuer: StrictStr
# oauth2 client id to use
client_id: StrictStr
# oauth2 client secret to use. if `None`, use client_secret_jwt_key to generate
# a secret.
client_secret: Optional[StrictStr]
# key to use to construct a JWT to use as a client secret. May be `None` if
# `client_secret` is set.
# TODO: test that ClientSecretJWTKey is being parsed correctly
client_secret_jwt_key: Optional[ClientSecretJWTKey]
# TODO: what is the precise relationship between client_auth_method, client_secret
# and client_secret_jwt_key? Is there anything we should enforce with a validator?
# auth method to use when exchanging the token.
# Valid values are 'client_secret_basic', 'client_secret_post' and
# 'none'.
client_auth_method: ClientAuthMethods = ClientAuthMethods.client_secret_basic
# list of scopes to request
scopes: Tuple[StrictStr, ...] = ("openid",)
# the oauth2 authorization endpoint. Required if discovery is disabled.
authorization_endpoint: Optional[StrictStr]
# the oauth2 token endpoint. Required if discovery is disabled.
token_endpoint: Optional[StrictStr]
# Normally, validators aren't run when fields don't have a value provided.
# Using validate=True ensures we run the validator even in that situation.
@validator("authorization_endpoint", "token_endpoint", always=True)
def endpoints_required_if_discovery_disabled(
cls,
endpoint_url: Optional[str],
values: Mapping[str, Any],
field: ModelField,
) -> Optional[str]:
# `if "discover" in values means: don't run our checks if "discover" didn't
# pass validation. (NB: validation order is the field definition order)
if "discover" in values and not values["discover"] and endpoint_url is None:
raise ValueError(f"{field.name} is required if discovery is disabled")
return endpoint_url
# the OIDC userinfo endpoint. Required if discovery is disabled and the
# "openid" scope is not requested.
userinfo_endpoint: Optional[StrictStr]
@validator("userinfo_endpoint", always=True)
def userinfo_endpoint_required_without_discovery_and_without_openid_scope(
cls, userinfo_endpoint: Optional[str], values: Mapping[str, Any]
) -> Optional[str]:
discovery_disabled = "discover" in values and not values["discover"]
openid_scope_not_requested = (
"scopes" in values and "openid" not in values["scopes"]
)
if (
discovery_disabled
and openid_scope_not_requested
and userinfo_endpoint is None
):
raise ValueError(
"userinfo_requirement is required if discovery is disabled and"
"the 'openid' scope is not requested"
)
return userinfo_endpoint
# URI where to fetch the JWKS. Required if discovery is disabled and the
# "openid" scope is used.
jwks_uri: Optional[StrictStr]
@validator("jwks_uri", always=True)
def jwks_uri_required_without_discovery_but_with_openid_scope(
cls, jwks_uri: Optional[str], values: Mapping[str, Any]
) -> Optional[str]:
discovery_disabled = "discover" in values and not values["discover"]
openid_scope_requested = "scopes" in values and "openid" in values["scopes"]
if discovery_disabled and openid_scope_requested and jwks_uri is None:
raise ValueError(
"jwks_uri is required if discovery is disabled and"
"the 'openid' scope is not requested"
)
return jwks_uri
# Whether to skip metadata verification
skip_verification: StrictBool = False
# Whether to fetch the user profile from the userinfo endpoint. Valid
# values are: "auto" or "userinfo_endpoint".
user_profile_method: UserProfileMethod = UserProfileMethod.auto
# whether to allow a user logging in via OIDC to match a pre-existing account
# instead of failing
allow_existing_users: StrictBool = False
# the class of the user mapping provider
# TODO there was logic for this
user_mapping_provider_class: Any # TODO: Type
# the config of the user mapping provider
# TODO
user_mapping_provider_config: Any
# required attributes to require in userinfo to allow login/registration
# TODO: wouldn't this be better expressed as a Mapping[str, str]?
attribute_requirements: Tuple[SSOAttributeRequirement, ...] = ()
class LegacyOIDCProviderModel(OIDCProviderModel):
# These fields could be omitted in the old scheme.
idp_id: IDP_ID_TYPE = "oidc"
idp_name: StrictStr = "OIDC"
# TODO
# top-level config: check we don't have any duplicate idp_ids now
# compute callback url

415
tests/config/test_oidc2.py Normal file
View File

@@ -0,0 +1,415 @@
from contextlib import contextmanager
from copy import deepcopy
from typing import Any, Dict
from unittest import TestCase
import yaml
from parameterized import parameterized
from pydantic import ValidationError
from synapse.config.oidc2 import (
ClientAuthMethods,
LegacyOIDCProviderModel,
OIDCProviderModel,
)
SAMPLE_CONFIG = yaml.safe_load(
"""
idp_id: my_idp
idp_name: My OpenID provider
idp_icon: "mxc://example.com/blahblahblah"
idp_brand: "brandy"
issuer: "https://accountns.exeample.com"
client_id: "provided-by-your-issuer"
client_secret_jwt_key:
key: DUMMY_PRIVATE_KEY
jwt_header:
alg: ES256
kid: potato123
jwt_payload:
iss: issuer456
client_auth_method: "client_secret_post"
scopes: ["name", "email", "openid"]
authorization_endpoint: https://example.com/auth/authorize?response_mode=form_post
token_endpoint: https://id.example.com/dummy_url_here
jwks_uri: "https://accounts.example.com/.well-known/jwks.json"
user_mapping_provider:
config:
email_template: "{{ user.email }}"
localpart_template: "{{ user.email|localpart_from_email }}"
confirm_localpart: true
attribute_requirements:
- attribute: userGroup
value: "synapseUsers"
"""
)
class PydanticOIDCTestCase(TestCase):
"""Examples to build confidence that pydantic is doing the validation we think
it's doing"""
# Each test gets a dummy config it can change as it sees fit
config: Dict[str, Any]
def setUp(self) -> None:
self.config = deepcopy(SAMPLE_CONFIG)
@contextmanager
def assertRaises(self, *args, **kwargs):
"""To demonstrate the example error messages generated by Pydantic, uncomment
this method."""
with super().assertRaises(*args, **kwargs) as result:
yield result
print()
print(result.exception)
def test_example_config(self):
# Check that parsing the sample config doesn't raise an error.
OIDCProviderModel.parse_obj(self.config)
def test_idp_id(self) -> None:
"""Example of using a Pydantic constr() field without a default."""
# Enforce that idp_id is required.
with self.assertRaises(ValidationError):
del self.config["idp_id"]
OIDCProviderModel.parse_obj(self.config)
# Enforce that idp_id is a string.
for bad_value in 123, None, ["a"], {"a": "b"}:
with self.assertRaises(ValidationError):
self.config["idp_id"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# Enforce a length between 1 and 250.
with self.assertRaises(ValidationError):
self.config["idp_id"] = ""
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
self.config["idp_id"] = "a" * 251
OIDCProviderModel.parse_obj(self.config)
# Enforce the regex
with self.assertRaises(ValidationError):
self.config["idp_id"] = "$"
OIDCProviderModel.parse_obj(self.config)
# What happens with a really long string of prohibited characters?
with self.assertRaises(ValidationError):
self.config["idp_id"] = "$" * 500
OIDCProviderModel.parse_obj(self.config)
def test_legacy_model(self) -> None:
"""Example of widening a field's type in a subclass."""
# Check that parsing the sample config doesn't raise an error.
LegacyOIDCProviderModel.parse_obj(self.config)
# Check we have default values for the attributes which have a legacy fallback
del self.config["idp_id"]
del self.config["idp_name"]
model = LegacyOIDCProviderModel.parse_obj(self.config)
self.assertEqual(model.idp_id, "oidc")
self.assertEqual(model.idp_name, "OIDC")
# Check we still reject bad types
for bad_value in 123, [], {}, None:
with self.assertRaises(ValidationError) as e:
self.config["idp_id"] = bad_value
self.config["idp_name"] = bad_value
LegacyOIDCProviderModel.parse_obj(self.config)
# And while we're at it, check that we spot errors in both fields
reported_bad_fields = {item["loc"] for item in e.exception.errors()}
expected_bad_fields = {("idp_id",), ("idp_name",)}
self.assertEqual(
reported_bad_fields, expected_bad_fields, e.exception.errors()
)
def test_issuer(self) -> None:
"""Example of a StrictStr field without a default."""
# Empty and nonempty strings should be accepted.
for good_value in "", "hello", "hello" * 1000, "":
self.config["issuer"] = good_value
OIDCProviderModel.parse_obj(self.config)
# Invalid types should be rejected.
for bad_value in 123, None, ["h", "e", "l", "l", "o"], {"hello": "there"}:
with self.assertRaises(ValidationError):
self.config["issuer"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# A missing issuer should be rejected.
with self.assertRaises(ValidationError):
del self.config["issuer"]
OIDCProviderModel.parse_obj(self.config)
def test_idp_brand(self) -> None:
"""Example of an Optional[StrictStr] field."""
# Empty and nonempty strings should be accepted.
for good_value in "", "hello", "hello" * 1000, "":
self.config["idp_brand"] = good_value
OIDCProviderModel.parse_obj(self.config)
# Invalid types should be rejected.
for bad_value in 123, ["h", "e", "l", "l", "o"], {"hello": "there"}:
with self.assertRaises(ValidationError):
self.config["idp_brand"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# A lack of an idp_brand is fine...
del self.config["idp_brand"]
model = OIDCProviderModel.parse_obj(self.config)
self.assertIsNone(model.idp_brand)
# ... and interpreted the same as an explicit `None`.
self.config["idp_brand"] = None
model = OIDCProviderModel.parse_obj(self.config)
self.assertIsNone(model.idp_brand)
def test_idp_icon(self) -> None:
"""Example of a field with a custom validator."""
# Test that bad types are rejected, even with our validator in place
bad_value: object
for bad_value in None, {}, [], 123, 45.6:
with self.assertRaises(ValidationError):
self.config["idp_icon"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# Test that bad strings are rejected by our validator
for bad_value in "", "notaurl", "https://example.com", "mxc://mxc://mxc://":
with self.assertRaises(ValidationError):
self.config["idp_icon"] = bad_value
OIDCProviderModel.parse_obj(self.config)
def test_discover(self) -> None:
"""Example of a StrictBool field with a default."""
# Booleans are permitted.
for value in True, False:
self.config["discover"] = value
model = OIDCProviderModel.parse_obj(self.config)
self.assertEqual(model.discover, value)
# Invalid types should be rejected.
for bad_value in (
-1.0,
0,
1,
float("nan"),
"yes",
"NO",
"True",
"true",
None,
"None",
"null",
["a"],
{"a": "b"},
):
self.config["discover"] = bad_value
with self.assertRaises(ValidationError):
OIDCProviderModel.parse_obj(self.config)
# A missing value is okay, because this field has a default.
del self.config["discover"]
model = OIDCProviderModel.parse_obj(self.config)
self.assertIs(model.discover, True)
def test_client_auth_method(self) -> None:
"""This is an example of using a Pydantic string enum field."""
# check the allowed values are permitted and deserialise to an enum member
for method in "client_secret_basic", "client_secret_post", "none":
self.config["client_auth_method"] = method
model = OIDCProviderModel.parse_obj(self.config)
self.assertIs(model.client_auth_method, ClientAuthMethods[method])
# check the default applies if no auth method is provided.
del self.config["client_auth_method"]
model = OIDCProviderModel.parse_obj(self.config)
self.assertIs(model.client_auth_method, ClientAuthMethods.client_secret_basic)
# Check invalid types are rejected
for bad_value in 123, ["client_secret_basic"], {"a": 1}, None:
with self.assertRaises(ValidationError):
self.config["client_auth_method"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# Check that disallowed strings are rejected
with self.assertRaises(ValidationError):
self.config["client_auth_method"] = "No, Luke, _I_ am your father!"
OIDCProviderModel.parse_obj(self.config)
def test_scopes(self) -> None:
"""Example of a Tuple[StrictStr] with a default."""
# Check that the parsed object holds a tuple
self.config["scopes"] = []
model = OIDCProviderModel.parse_obj(self.config)
self.assertEqual(model.scopes, ())
# Check a variety of list lengths are accepted.
for good_value in ["aa"], ["hello", "world"], ["a"] * 4, [""] * 20:
self.config["scopes"] = good_value
model = OIDCProviderModel.parse_obj(self.config)
self.assertEqual(model.scopes, tuple(good_value))
# Check invalid types are rejected.
for bad_value in (
"",
"abc",
123,
{},
{"a": 1},
None,
[None],
[["a"]],
[{}],
[456],
):
with self.assertRaises(ValidationError):
self.config["scopes"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# Check that "scopes" may be omitted.
del self.config["scopes"]
model = OIDCProviderModel.parse_obj(self.config)
self.assertEqual(model.scopes, ("openid",))
@parameterized.expand(["authorization_endpoint", "token_endpoint"])
def test_endpoints_required_when_discovery_disabled(self, key: str) -> None:
"""Example of a validator that applies to multiple fields."""
# Test that this field is required if discovery is disabled
self.config["discover"] = False
with self.assertRaises(ValidationError):
self.config[key] = None
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
del self.config[key]
OIDCProviderModel.parse_obj(self.config)
# We don't validate that the endpoint is a sensible URL; anything str will do
self.config[key] = "blahblah"
OIDCProviderModel.parse_obj(self.config)
def check_all_cases_pass():
self.config[key] = None
OIDCProviderModel.parse_obj(self.config)
del self.config[key]
OIDCProviderModel.parse_obj(self.config)
self.config[key] = "blahblah"
OIDCProviderModel.parse_obj(self.config)
# With discovery enabled, all three cases are accepted.
self.config["discover"] = True
check_all_cases_pass()
# If not specified, discovery is also on by default.
del self.config["discover"]
check_all_cases_pass()
def test_userinfo_endpoint(self) -> None:
"""Example of a more fiddly validator"""
# This field is required if discovery is disabled and the openid scope
# not requested.
self.assertNotIn("userinfo_endpoint", self.config)
with self.assertRaises(ValidationError):
self.config["discover"] = False
self.config["scopes"] = ()
OIDCProviderModel.parse_obj(self.config)
# Still an error even if other scopes are provided
with self.assertRaises(ValidationError):
self.config["discover"] = False
self.config["scopes"] = ("potato", "tomato")
OIDCProviderModel.parse_obj(self.config)
# Passing an explicit None for userinfo_endpoint should also be an error.
with self.assertRaises(ValidationError):
self.config["discover"] = False
self.config["scopes"] = ()
self.config["userinfo_endpoint"] = None
OIDCProviderModel.parse_obj(self.config)
# No error if we enable discovery.
self.config["discover"] = True
self.config["scopes"] = ()
self.config["userinfo_endpoint"] = None
OIDCProviderModel.parse_obj(self.config)
# No error if we enable the openid scope.
self.config["discover"] = False
self.config["scopes"] = ("openid",)
self.config["userinfo_endpoint"] = None
OIDCProviderModel.parse_obj(self.config)
# No error if we don't specify scopes. (They default to `("openid", )`)
self.config["discover"] = False
del self.config["scopes"]
self.config["userinfo_endpoint"] = None
OIDCProviderModel.parse_obj(self.config)
def test_attribute_requirements(self):
# Example of a field involving a nested model
model = OIDCProviderModel.parse_obj(self.config)
self.assertIsInstance(model.attribute_requirements, tuple)
self.assertEqual(
len(model.attribute_requirements), 1, model.attribute_requirements
)
# Bad types should be rejected
bad_value: object
for bad_value in 123, 456.0, False, None, {}, ["hello"]:
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = bad_value
OIDCProviderModel.parse_obj(self.config)
# An empty list of requirements is okay, ...
self.config["attribute_requirements"] = []
OIDCProviderModel.parse_obj(self.config)
# ...as is an omitted list of requirements...
del self.config["attribute_requirements"]
OIDCProviderModel.parse_obj(self.config)
# ...but not an explicit None.
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = None
OIDCProviderModel.parse_obj(self.config)
# Multiple requirements are fine.
self.config["attribute_requirements"] = [{"attribute": "k", "value": "v"}] * 3
model = OIDCProviderModel.parse_obj(self.config)
self.assertEqual(
len(model.attribute_requirements), 3, model.attribute_requirements
)
# The submodel's field types should be enforced too.
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{"attribute": "key", "value": 123}]
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{"attribute": 123, "value": "val"}]
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{"attribute": "a", "value": ["b"]}]
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{"attribute": "a", "value": None}]
OIDCProviderModel.parse_obj(self.config)
# Missing fields in the submodel are an error.
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{"attribute": "a"}]
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{"value": "v"}]
OIDCProviderModel.parse_obj(self.config)
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [{}]
OIDCProviderModel.parse_obj(self.config)
# Extra fields in the submodel are an error.
with self.assertRaises(ValidationError):
self.config["attribute_requirements"] = [
{"attribute": "a", "value": "v", "answer": "forty-two"}
]
OIDCProviderModel.parse_obj(self.config)