| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 | import osfrom time import sleepfrom typing import Any, Generator, List, Literal, Unionimport anthropicimport pytestfrom _pytest.monkeypatch import MonkeyPatchfrom anthropic import Anthropicfrom anthropic._types import NOT_GIVEN, Body, Headers, NotGiven, Queryfrom anthropic.resources.completions import Completionsfrom anthropic.types import Completion, completion_create_paramsMOCK = os.getenv('MOCK_SWITCH', 'false') == 'true'class MockAnthropicClass(object):    @staticmethod    def mocked_anthropic_chat_create_sync(model: str) -> Completion:        return Completion(            completion='hello, I\'m a chatbot from anthropic',            model=model,            stop_reason='stop_sequence'        )    @staticmethod    def mocked_anthropic_chat_create_stream(model: str) -> Generator[Completion, None, None]:        full_response_text = "hello, I'm a chatbot from anthropic"        for i in range(0, len(full_response_text) + 1):            sleep(0.1)            if i == len(full_response_text):                yield Completion(                    completion='',                    model=model,                    stop_reason='stop_sequence'                )            else:                yield Completion(                    completion=full_response_text[i],                    model=model,                    stop_reason=''                )    def mocked_anthropic(self: Completions, *,        max_tokens_to_sample: int,        model: Union[str, Literal["claude-2.1", "claude-instant-1"]],        prompt: str,        stream: Literal[True],        **kwargs: Any    ) -> Union[Completion, Generator[Completion, None, None]]:        if len(self._client.api_key) < 18:            raise anthropic.AuthenticationError('Invalid API key')        if stream:            return MockAnthropicClass.mocked_anthropic_chat_create_stream(model=model)        else:            return MockAnthropicClass.mocked_anthropic_chat_create_sync(model=model)@pytest.fixturedef setup_anthropic_mock(request, monkeypatch: MonkeyPatch):    if MOCK:        monkeypatch.setattr(Completions, 'create', MockAnthropicClass.mocked_anthropic)    yield    if MOCK:        monkeypatch.undo()
 |