ext_celery.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from datetime import timedelta
  2. from celery import Celery, Task
  3. from flask import Flask
  4. def init_app(app: Flask) -> Celery:
  5. class FlaskTask(Task):
  6. def __call__(self, *args: object, **kwargs: object) -> object:
  7. with app.app_context():
  8. return self.run(*args, **kwargs)
  9. celery_app = Celery(
  10. app.name,
  11. task_cls=FlaskTask,
  12. broker=app.config["CELERY_BROKER_URL"],
  13. backend=app.config["CELERY_BACKEND"],
  14. task_ignore_result=True,
  15. )
  16. # Add SSL options to the Celery configuration
  17. ssl_options = {
  18. "ssl_cert_reqs": None,
  19. "ssl_ca_certs": None,
  20. "ssl_certfile": None,
  21. "ssl_keyfile": None,
  22. }
  23. celery_app.conf.update(
  24. result_backend=app.config["CELERY_RESULT_BACKEND"],
  25. broker_connection_retry_on_startup=True,
  26. )
  27. if app.config["BROKER_USE_SSL"]:
  28. celery_app.conf.update(
  29. broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration
  30. )
  31. celery_app.set_default()
  32. app.extensions["celery"] = celery_app
  33. imports = [
  34. "schedule.clean_embedding_cache_task",
  35. "schedule.clean_unused_datasets_task",
  36. ]
  37. beat_schedule = {
  38. 'clean_embedding_cache_task': {
  39. 'task': 'schedule.clean_embedding_cache_task.clean_embedding_cache_task',
  40. 'schedule': timedelta(days=1),
  41. },
  42. 'clean_unused_datasets_task': {
  43. 'task': 'schedule.clean_unused_datasets_task.clean_unused_datasets_task',
  44. 'schedule': timedelta(minutes=3),
  45. }
  46. }
  47. celery_app.conf.update(
  48. beat_schedule=beat_schedule,
  49. imports=imports
  50. )
  51. return celery_app