oauth.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import urllib.parse
  2. from dataclasses import dataclass
  3. from typing import Optional
  4. import requests
  5. @dataclass
  6. class OAuthUserInfo:
  7. id: str
  8. name: str
  9. email: str
  10. class OAuth:
  11. def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
  12. self.client_id = client_id
  13. self.client_secret = client_secret
  14. self.redirect_uri = redirect_uri
  15. def get_authorization_url(self):
  16. raise NotImplementedError()
  17. def get_access_token(self, code: str):
  18. raise NotImplementedError()
  19. def get_raw_user_info(self, token: str):
  20. raise NotImplementedError()
  21. def get_user_info(self, token: str) -> OAuthUserInfo:
  22. raw_info = self.get_raw_user_info(token)
  23. return self._transform_user_info(raw_info)
  24. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  25. raise NotImplementedError()
  26. class GitHubOAuth(OAuth):
  27. _AUTH_URL = "https://github.com/login/oauth/authorize"
  28. _TOKEN_URL = "https://github.com/login/oauth/access_token"
  29. _USER_INFO_URL = "https://api.github.com/user"
  30. _EMAIL_INFO_URL = "https://api.github.com/user/emails"
  31. def get_authorization_url(self, invite_token: Optional[str] = None):
  32. params = {
  33. "client_id": self.client_id,
  34. "redirect_uri": self.redirect_uri,
  35. "scope": "user:email", # Request only basic user information
  36. }
  37. if invite_token:
  38. params["state"] = invite_token
  39. return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
  40. def get_access_token(self, code: str):
  41. data = {
  42. "client_id": self.client_id,
  43. "client_secret": self.client_secret,
  44. "code": code,
  45. "redirect_uri": self.redirect_uri,
  46. }
  47. headers = {"Accept": "application/json"}
  48. response = requests.post(self._TOKEN_URL, data=data, headers=headers)
  49. response_json = response.json()
  50. access_token = response_json.get("access_token")
  51. if not access_token:
  52. raise ValueError(f"Error in GitHub OAuth: {response_json}")
  53. return access_token
  54. def get_raw_user_info(self, token: str):
  55. headers = {"Authorization": f"token {token}"}
  56. response = requests.get(self._USER_INFO_URL, headers=headers)
  57. response.raise_for_status()
  58. user_info = response.json()
  59. email_response = requests.get(self._EMAIL_INFO_URL, headers=headers)
  60. email_info = email_response.json()
  61. primary_email = next((email for email in email_info if email["primary"] == True), None)
  62. return {**user_info, "email": primary_email["email"]}
  63. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  64. email = raw_info.get("email")
  65. if not email:
  66. email = f"{raw_info['id']}+{raw_info['login']}@users.noreply.github.com"
  67. return OAuthUserInfo(id=str(raw_info["id"]), name=raw_info["name"], email=email)
  68. class GoogleOAuth(OAuth):
  69. _AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
  70. _TOKEN_URL = "https://oauth2.googleapis.com/token"
  71. _USER_INFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
  72. def get_authorization_url(self, invite_token: Optional[str] = None):
  73. params = {
  74. "client_id": self.client_id,
  75. "response_type": "code",
  76. "redirect_uri": self.redirect_uri,
  77. "scope": "openid email",
  78. }
  79. if invite_token:
  80. params["state"] = invite_token
  81. return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
  82. def get_access_token(self, code: str):
  83. data = {
  84. "client_id": self.client_id,
  85. "client_secret": self.client_secret,
  86. "code": code,
  87. "grant_type": "authorization_code",
  88. "redirect_uri": self.redirect_uri,
  89. }
  90. headers = {"Accept": "application/json"}
  91. response = requests.post(self._TOKEN_URL, data=data, headers=headers)
  92. response_json = response.json()
  93. access_token = response_json.get("access_token")
  94. if not access_token:
  95. raise ValueError(f"Error in Google OAuth: {response_json}")
  96. return access_token
  97. def get_raw_user_info(self, token: str):
  98. headers = {"Authorization": f"Bearer {token}"}
  99. response = requests.get(self._USER_INFO_URL, headers=headers)
  100. response.raise_for_status()
  101. return response.json()
  102. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  103. return OAuthUserInfo(id=str(raw_info["sub"]), name=None, email=raw_info["email"])