123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import json
- import logging
- import time
- import click
- from celery import shared_task
- from core.indexing_runner import DocumentIsPausedException
- from extensions.ext_database import db
- from extensions.ext_storage import storage
- from models.dataset import Dataset, ExternalKnowledgeApis
- from models.model import UploadFile
- from services.external_knowledge_service import ExternalDatasetService
- @shared_task(queue="dataset")
- def external_document_indexing_task(
- dataset_id: str, external_knowledge_api_id: str, data_source: dict, process_parameter: dict
- ):
- """
- Async process document
- :param dataset_id:
- :param external_knowledge_api_id:
- :param data_source:
- :param process_parameter:
- Usage: external_document_indexing_task.delay(dataset_id, document_id)
- """
- start_at = time.perf_counter()
- dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
- if not dataset:
- logging.info(
- click.style("Processed external dataset: {} failed, dataset not exit.".format(dataset_id), fg="red")
- )
- return
- # get external api template
- external_knowledge_api = (
- db.session.query(ExternalKnowledgeApis)
- .filter(
- ExternalKnowledgeApis.id == external_knowledge_api_id, ExternalKnowledgeApis.tenant_id == dataset.tenant_id
- )
- .first()
- )
- if not external_knowledge_api:
- logging.info(
- click.style(
- "Processed external dataset: {} failed, api template: {} not exit.".format(
- dataset_id, external_knowledge_api_id
- ),
- fg="red",
- )
- )
- return
- files = {}
- if data_source["type"] == "upload_file":
- upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"]
- for file_id in upload_file_list:
- file = (
- db.session.query(UploadFile)
- .filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id)
- .first()
- )
- if file:
- files[file.id] = (file.name, storage.load_once(file.key), file.mime_type)
- try:
- settings = ExternalDatasetService.get_external_knowledge_api_settings(
- json.loads(external_knowledge_api.settings)
- )
- # assemble headers
- headers = ExternalDatasetService.assembling_headers(settings.authorization, settings.headers)
- # do http request
- response = ExternalDatasetService.process_external_api(settings, headers, process_parameter, files)
- job_id = response.json().get("job_id")
- if job_id:
- # save job_id to dataset
- dataset.job_id = job_id
- db.session.commit()
- end_at = time.perf_counter()
- logging.info(
- click.style(
- "Processed external dataset: {} successful, latency: {}".format(dataset.id, end_at - start_at),
- fg="green",
- )
- )
- except DocumentIsPausedException as ex:
- logging.info(click.style(str(ex), fg="yellow"))
- except Exception:
- pass
|