| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 | from core.moderation.base import Moderation, ModerationAction, ModerationInputsResult, ModerationOutputsResultclass KeywordsModeration(Moderation):    name: str = "keywords"    @classmethod    def validate_config(cls, tenant_id: str, config: dict) -> None:        """        Validate the incoming form config data.        :param tenant_id: the id of workspace        :param config: the form config data        :return:        """        cls._validate_inputs_and_outputs_config(config, True)        if not config.get("keywords"):            raise ValueError("keywords is required")        if len(config.get("keywords")) > 1000:            raise ValueError("keywords length must be less than 1000")    def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:        flagged = False        preset_response = ""        if self.config['inputs_config']['enabled']:            preset_response = self.config['inputs_config']['preset_response']            if query:                inputs['query__'] = query            # Filter out empty values            keywords_list = [keyword for keyword in self.config['keywords'].split('\n') if keyword]            flagged = self._is_violated(inputs, keywords_list)        return ModerationInputsResult(flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response)    def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:        flagged = False        preset_response = ""        if self.config['outputs_config']['enabled']:            # Filter out empty values            keywords_list = [keyword for keyword in self.config['keywords'].split('\n') if keyword]            flagged = self._is_violated({'text': text}, keywords_list)            preset_response = self.config['outputs_config']['preset_response']        return ModerationOutputsResult(flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response)    def _is_violated(self, inputs: dict, keywords_list: list) -> bool:        for value in inputs.values():            if self._check_keywords_in_value(keywords_list, value):                return True        return False    def _check_keywords_in_value(self, keywords_list, value):        for keyword in keywords_list:            if keyword.lower() in value.lower():                return True        return False
 |