ext_celery.py 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. from celery import Task, Celery
  2. from flask import Flask
  3. def init_app(app: Flask) -> Celery:
  4. class FlaskTask(Task):
  5. def __call__(self, *args: object, **kwargs: object) -> object:
  6. with app.app_context():
  7. return self.run(*args, **kwargs)
  8. celery_app = Celery(
  9. app.name,
  10. task_cls=FlaskTask,
  11. broker=app.config["CELERY_BROKER_URL"],
  12. backend=app.config["CELERY_BACKEND"],
  13. task_ignore_result=True,
  14. )
  15. # Add SSL options to the Celery configuration
  16. ssl_options = {
  17. "ssl_cert_reqs": None,
  18. "ssl_ca_certs": None,
  19. "ssl_certfile": None,
  20. "ssl_keyfile": None,
  21. }
  22. celery_app.conf.update(
  23. result_backend=app.config["CELERY_RESULT_BACKEND"],
  24. )
  25. if app.config["BROKER_USE_SSL"]:
  26. celery_app.conf.update(
  27. broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration
  28. )
  29. celery_app.set_default()
  30. app.extensions["celery"] = celery_app
  31. return celery_app