oauth.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import urllib.parse
  2. from dataclasses import dataclass
  3. import requests
  4. @dataclass
  5. class OAuthUserInfo:
  6. id: str
  7. name: str
  8. email: str
  9. class OAuth:
  10. def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
  11. self.client_id = client_id
  12. self.client_secret = client_secret
  13. self.redirect_uri = redirect_uri
  14. def get_authorization_url(self):
  15. raise NotImplementedError()
  16. def get_access_token(self, code: str):
  17. raise NotImplementedError()
  18. def get_raw_user_info(self, token: str):
  19. raise NotImplementedError()
  20. def get_user_info(self, token: str) -> OAuthUserInfo:
  21. raw_info = self.get_raw_user_info(token)
  22. return self._transform_user_info(raw_info)
  23. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  24. raise NotImplementedError()
  25. class GitHubOAuth(OAuth):
  26. _AUTH_URL = 'https://github.com/login/oauth/authorize'
  27. _TOKEN_URL = 'https://github.com/login/oauth/access_token'
  28. _USER_INFO_URL = 'https://api.github.com/user'
  29. _EMAIL_INFO_URL = 'https://api.github.com/user/emails'
  30. def get_authorization_url(self):
  31. params = {
  32. 'client_id': self.client_id,
  33. 'redirect_uri': self.redirect_uri,
  34. 'scope': 'user:email' # Request only basic user information
  35. }
  36. return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
  37. def get_access_token(self, code: str):
  38. data = {
  39. 'client_id': self.client_id,
  40. 'client_secret': self.client_secret,
  41. 'code': code,
  42. 'redirect_uri': self.redirect_uri
  43. }
  44. headers = {'Accept': 'application/json'}
  45. response = requests.post(self._TOKEN_URL, data=data, headers=headers)
  46. response_json = response.json()
  47. access_token = response_json.get('access_token')
  48. if not access_token:
  49. raise ValueError(f"Error in GitHub OAuth: {response_json}")
  50. return access_token
  51. def get_raw_user_info(self, token: str):
  52. headers = {'Authorization': f"token {token}"}
  53. response = requests.get(self._USER_INFO_URL, headers=headers)
  54. response.raise_for_status()
  55. user_info = response.json()
  56. email_response = requests.get(self._EMAIL_INFO_URL, headers=headers)
  57. email_info = email_response.json()
  58. primary_email = next((email for email in email_info if email['primary'] == True), None)
  59. return {**user_info, 'email': primary_email['email']}
  60. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  61. email = raw_info.get('email')
  62. if not email:
  63. email = f"{raw_info['id']}+{raw_info['login']}@users.noreply.github.com"
  64. return OAuthUserInfo(
  65. id=str(raw_info['id']),
  66. name=raw_info['name'],
  67. email=email
  68. )
  69. class GoogleOAuth(OAuth):
  70. _AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
  71. _TOKEN_URL = 'https://oauth2.googleapis.com/token'
  72. _USER_INFO_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
  73. def get_authorization_url(self):
  74. params = {
  75. 'client_id': self.client_id,
  76. 'response_type': 'code',
  77. 'redirect_uri': self.redirect_uri,
  78. 'scope': 'openid email'
  79. }
  80. return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
  81. def get_access_token(self, code: str):
  82. data = {
  83. 'client_id': self.client_id,
  84. 'client_secret': self.client_secret,
  85. 'code': code,
  86. 'grant_type': 'authorization_code',
  87. 'redirect_uri': self.redirect_uri
  88. }
  89. headers = {'Accept': 'application/json'}
  90. response = requests.post(self._TOKEN_URL, data=data, headers=headers)
  91. response_json = response.json()
  92. access_token = response_json.get('access_token')
  93. if not access_token:
  94. raise ValueError(f"Error in Google OAuth: {response_json}")
  95. return access_token
  96. def get_raw_user_info(self, token: str):
  97. headers = {'Authorization': f"Bearer {token}"}
  98. response = requests.get(self._USER_INFO_URL, headers=headers)
  99. response.raise_for_status()
  100. return response.json()
  101. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  102. return OAuthUserInfo(
  103. id=str(raw_info['sub']),
  104. name=None,
  105. email=raw_info['email']
  106. )