| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 | import jsonfrom core.helper import encrypterfrom extensions.ext_database import dbfrom models.source import DataSourceApiKeyAuthBindingfrom services.auth.api_key_auth_factory import ApiKeyAuthFactoryclass ApiKeyAuthService:    @staticmethod    def get_provider_auth_list(tenant_id: str) -> list:        data_source_api_key_bindings = db.session.query(DataSourceApiKeyAuthBinding).filter(            DataSourceApiKeyAuthBinding.tenant_id == tenant_id,            DataSourceApiKeyAuthBinding.disabled.is_(False)        ).all()        return data_source_api_key_bindings    @staticmethod    def create_provider_auth(tenant_id: str, args: dict):        auth_result = ApiKeyAuthFactory(args['provider'], args['credentials']).validate_credentials()        if auth_result:            # Encrypt the api key            api_key = encrypter.encrypt_token(tenant_id, args['credentials']['config']['api_key'])            args['credentials']['config']['api_key'] = api_key            data_source_api_key_binding = DataSourceApiKeyAuthBinding()            data_source_api_key_binding.tenant_id = tenant_id            data_source_api_key_binding.category = args['category']            data_source_api_key_binding.provider = args['provider']            data_source_api_key_binding.credentials = json.dumps(args['credentials'], ensure_ascii=False)            db.session.add(data_source_api_key_binding)            db.session.commit()    @staticmethod    def get_auth_credentials(tenant_id: str, category: str, provider: str):        data_source_api_key_bindings = db.session.query(DataSourceApiKeyAuthBinding).filter(            DataSourceApiKeyAuthBinding.tenant_id == tenant_id,            DataSourceApiKeyAuthBinding.category == category,            DataSourceApiKeyAuthBinding.provider == provider,            DataSourceApiKeyAuthBinding.disabled.is_(False)        ).first()        if not data_source_api_key_bindings:            return None        credentials = json.loads(data_source_api_key_bindings.credentials)        return credentials    @staticmethod    def delete_provider_auth(tenant_id: str, binding_id: str):        data_source_api_key_binding = db.session.query(DataSourceApiKeyAuthBinding).filter(            DataSourceApiKeyAuthBinding.tenant_id == tenant_id,            DataSourceApiKeyAuthBinding.id == binding_id        ).first()        if data_source_api_key_binding:            db.session.delete(data_source_api_key_binding)            db.session.commit()    @classmethod    def validate_api_key_auth_args(cls, args):        if 'category' not in args or not args['category']:            raise ValueError('category is required')        if 'provider' not in args or not args['provider']:            raise ValueError('provider is required')        if 'credentials' not in args or not args['credentials']:            raise ValueError('credentials is required')        if not isinstance(args['credentials'], dict):            raise ValueError('credentials must be a dictionary')        if 'auth_type' not in args['credentials'] or not args['credentials']['auth_type']:            raise ValueError('auth_type is required')
 |