ssrf_proxy.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. """
  2. Proxy requests to avoid SSRF
  3. """
  4. import logging
  5. import os
  6. import time
  7. import httpx
  8. SSRF_PROXY_ALL_URL = os.getenv('SSRF_PROXY_ALL_URL', '')
  9. SSRF_PROXY_HTTP_URL = os.getenv('SSRF_PROXY_HTTP_URL', '')
  10. SSRF_PROXY_HTTPS_URL = os.getenv('SSRF_PROXY_HTTPS_URL', '')
  11. SSRF_DEFAULT_MAX_RETRIES = int(os.getenv('SSRF_DEFAULT_MAX_RETRIES', '3'))
  12. proxies = {
  13. 'http://': SSRF_PROXY_HTTP_URL,
  14. 'https://': SSRF_PROXY_HTTPS_URL
  15. } if SSRF_PROXY_HTTP_URL and SSRF_PROXY_HTTPS_URL else None
  16. BACKOFF_FACTOR = 0.5
  17. STATUS_FORCELIST = [429, 500, 502, 503, 504]
  18. def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  19. if "allow_redirects" in kwargs:
  20. allow_redirects = kwargs.pop("allow_redirects")
  21. if "follow_redirects" not in kwargs:
  22. kwargs["follow_redirects"] = allow_redirects
  23. retries = 0
  24. while retries <= max_retries:
  25. try:
  26. if SSRF_PROXY_ALL_URL:
  27. response = httpx.request(method=method, url=url, proxy=SSRF_PROXY_ALL_URL, **kwargs)
  28. elif proxies:
  29. response = httpx.request(method=method, url=url, proxies=proxies, **kwargs)
  30. else:
  31. response = httpx.request(method=method, url=url, **kwargs)
  32. if response.status_code not in STATUS_FORCELIST:
  33. return response
  34. else:
  35. logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")
  36. except httpx.RequestError as e:
  37. logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
  38. retries += 1
  39. if retries <= max_retries:
  40. time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1)))
  41. raise Exception(f"Reached maximum retries ({max_retries}) for URL {url}")
  42. def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  43. return make_request('GET', url, max_retries=max_retries, **kwargs)
  44. def post(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  45. return make_request('POST', url, max_retries=max_retries, **kwargs)
  46. def put(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  47. return make_request('PUT', url, max_retries=max_retries, **kwargs)
  48. def patch(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  49. return make_request('PATCH', url, max_retries=max_retries, **kwargs)
  50. def delete(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  51. return make_request('DELETE', url, max_retries=max_retries, **kwargs)
  52. def head(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
  53. return make_request('HEAD', url, max_retries=max_retries, **kwargs)