update_tidb_serverless_status_task.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import time
  2. import click
  3. import app
  4. from configs import dify_config
  5. from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService
  6. from models.dataset import TidbAuthBinding
  7. @app.celery.task(queue="dataset")
  8. def update_tidb_serverless_status_task():
  9. click.echo(click.style("Update tidb serverless status task.", fg="green"))
  10. start_at = time.perf_counter()
  11. while True:
  12. try:
  13. # check the number of idle tidb serverless
  14. tidb_serverless_list = TidbAuthBinding.query.filter(
  15. TidbAuthBinding.active == False, TidbAuthBinding.status == "CREATING"
  16. ).all()
  17. if len(tidb_serverless_list) == 0:
  18. break
  19. # update tidb serverless status
  20. iterations_per_thread = 20
  21. update_clusters(tidb_serverless_list)
  22. except Exception as e:
  23. click.echo(click.style(f"Error: {e}", fg="red"))
  24. break
  25. end_at = time.perf_counter()
  26. click.echo(
  27. click.style("Update tidb serverless status task success latency: {}".format(end_at - start_at), fg="green")
  28. )
  29. def update_clusters(tidb_serverless_list: list[TidbAuthBinding]):
  30. try:
  31. # batch 20
  32. for i in range(0, len(tidb_serverless_list), 20):
  33. items = tidb_serverless_list[i : i + 20]
  34. TidbService.batch_update_tidb_serverless_cluster_status(
  35. items,
  36. dify_config.TIDB_PROJECT_ID,
  37. dify_config.TIDB_API_URL,
  38. dify_config.TIDB_IAM_API_URL,
  39. dify_config.TIDB_PUBLIC_KEY,
  40. dify_config.TIDB_PRIVATE_KEY,
  41. )
  42. except Exception as e:
  43. click.echo(click.style(f"Error: {e}", fg="red"))