Skip to content

Note

Click here to download the full example code

Streaming messages

Ragna supports streaming responses from the assistant. This example showcases how this is performed using the Python and REST API.

Before we start this example, we import some helpers.

import sys
from pathlib import Path

sys.path.insert(0, str(Path.cwd().parent))

import documentation_helpers

Setup streaming assistant

To be able to stream a message from an assistant, it needs to support streaming. For this example, we subclass the ragna.assistants.RagnaDemoAssistant, split its message on whitespace, and return the individual chunks.

from ragna import assistants


class DemoStreamingAssistant(assistants.RagnaDemoAssistant):
    def answer(self, prompt, sources):
        content = next(super().answer(prompt, sources))
        for chunk in content.split(" "):
            yield f"{chunk} "

Python API

Let's create and prepare a chat using the assistant we have defined above.

from ragna import Rag, source_storages

document_path = documentation_helpers.assets / "ragna.txt"

chat = Rag().chat(
    documents=[document_path],
    source_storage=source_storages.RagnaDemoSourceStorage,
    assistant=DemoStreamingAssistant,
)
_ = await chat.prepare()
message = await chat.answer("What is Ragna?", stream=True)

At this stage, we cannot access the content of the message, e.g. by printing it.

try:
    print(message)
except Exception as error:
    print(f"{type(error).__name__}: {error}")

Out:

RuntimeError: Message content cannot be accessed without having iterated over it, e.g. `async for chunk in message`, or reading the content, e.g. `await message.read()`, first.

To get the individual chunks, we asynchronously iterate over the message.

chunks = [chunk async for chunk in message]

print(len(chunks))
print(chunks)

Out:

48
["I'm ", 'a ', 'demo ', 'assistant ', 'and ', 'can ', 'be ', 'used ', 'to ', 'try ', 'Ragnas ', 'workflow.\nI ', 'will ', 'only ', 'mirror ', 'back ', 'my ', 'inputs. ', '\n\nYour ', 'prompt ', 'was:\n\n> ', 'What ', 'is ', 'Ragna?\n\nThese ', 'are ', 'the ', 'sources ', 'I ', 'was ', 'given:\n\n- ', 'ragna.txt: ', 'Ragna ', 'is ', 'an ', 'open ', 'source ', 'project ', 'built ', 'by ', 'Quansight. ', 'It ', 'is ', 'designed ', 'to ', 'allow ', 'organizations ', 'to ', '[...] ']

Joining the chunks together results in the full message.

print("".join(chunks))

Out:

I'm a demo assistant and can be used to try Ragnas workflow.
I will only mirror back my inputs. 

Your prompt was:

> What is Ragna?

These are the sources I was given:

- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...] 

REST API

from ragna.deploy import Config

config = Config(assistants=[DemoStreamingAssistant])

rest_api = documentation_helpers.RestApi()

client = rest_api.start(config, authenticate=True)

Out:

Starting Ragna REST API
...

Upload the document.

document_upload = (
    client.post("/document", json={"name": document_path.name})
    .raise_for_status()
    .json()
)

document = document_upload["document"]

parameters = document_upload["parameters"]
client.request(
    parameters["method"],
    parameters["url"],
    data=parameters["data"],
    files={"file": open(document_path, "rb")},
).raise_for_status()

Out:

<Response [200 OK]>

Start and prepare the chat

chat = (
    client.post(
        "/chats",
        json={
            "name": "Tutorial REST API",
            "documents": [document],
            "source_storage": source_storages.RagnaDemoSourceStorage.display_name(),
            "assistant": DemoStreamingAssistant.display_name(),
            "params": {},
        },
    )
    .raise_for_status()
    .json()
)

client.post(f"/chats/{chat['id']}/prepare").raise_for_status()

Out:

<Response [200 OK]>

Streaming the response is performed with JSONL. Each line in the response is valid JSON and corresponds to one chunk.

import json


with client.stream(
    "POST",
    f"/chats/{chat['id']}/answer",
    json={"prompt": "What is Ragna?", "stream": True},
) as response:
    chunks = [json.loads(data) for data in response.iter_lines()]

The first chunk contains the full message object including the sources along the first chunk of the content.

print(len(chunks))
print(json.dumps(chunks[0], indent=2))

Out:

48
{
  "id": "4d49e1af-7d2f-4500-8099-daf5c8deec51",
  "content": "I'm ",
  "role": "assistant",
  "sources": [
    {
      "id": "f86a79b4-50d5-40c7-86f7-4c8a832b09bb",
      "document": {
        "id": "9b229b09-f17b-4082-ba21-e53d096eb940",
        "name": "ragna.txt"
      },
      "location": "",
      "content": "Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]",
      "num_tokens": 17
    }
  ],
  "timestamp": "2024-04-21T13:59:53.827535"
}

Subsequent chunks no longer contain the sources.

print(json.dumps(chunks[1], indent=2))

Out:

{
  "id": "4d49e1af-7d2f-4500-8099-daf5c8deec51",
  "content": "a ",
  "role": "assistant",
  "sources": null,
  "timestamp": "2024-04-21T13:59:53.827535"
}

Joining the content of the chunks together results in the full message.

print("".join(chunk["content"] for chunk in chunks))

Out:

I'm a demo assistant and can be used to try Ragnas workflow.
I will only mirror back my inputs. 

Your prompt was:

> What is Ragna?

These are the sources I was given:

- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...] 

Before we close the example, let's stop the REST API and have a look at what would have printed in the terminal if we had started it the regular way.

rest_api.stop()

Out:

INFO:   RagnaDemoAuthentication: You can log in with any username and a matching
password.
INFO:     127.0.0.1:40104 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:40104 - "POST /token HTTP/1.1" 200 OK
INFO:     127.0.0.1:40104 - "POST /document HTTP/1.1" 200 OK
INFO:     127.0.0.1:40104 - "PUT /document HTTP/1.1" 200 OK
INFO:     127.0.0.1:40104 - "POST /chats HTTP/1.1" 200 OK
INFO:     127.0.0.1:40104 - "POST /chats/c163adea-b05e-4fc4-a160-6554eefebb1a/prepare HTTP/1.1" 200 OK
INFO:     127.0.0.1:40104 - "POST /chats/c163adea-b05e-4fc4-a160-6554eefebb1a/answer HTTP/1.1" 200 OK

Total running time of the script: ( 0 minutes 4.714 seconds)

Download Python source code: gallery_streaming.py

Download Jupyter notebook: gallery_streaming.ipynb

Gallery generated by mkdocs-gallery