Skip to content

Note

Click here to download the full example code

Streaming messages

Ragna supports streaming responses from the assistant. This example showcases how this is performed using the Python and REST API.

Setup streaming assistant

To be able to stream a message from an assistant, it needs to support streaming. For this example, we subclass the ragna.assistants.RagnaDemoAssistant, split its message on whitespace, and return the individual chunks.

from ragna import assistants


class DemoStreamingAssistant(assistants.RagnaDemoAssistant):
    def answer(self, messages):
        content = next(super().answer(messages))
        for chunk in content.split(" "):
            yield f"{chunk} "

Python API

Let's create and prepare a chat using the assistant we have defined above.

from pathlib import Path

import ragna._docs as ragna_docs

from ragna import Rag, source_storages

print(ragna_docs.SAMPLE_CONTENT)

document_path = Path.cwd() / "ragna.txt"

with open(document_path, "w") as file:
    file.write(ragna_docs.SAMPLE_CONTENT)

chat = Rag().chat(
    input=[document_path],
    source_storage=source_storages.RagnaDemoSourceStorage,
    assistant=DemoStreamingAssistant,
)
_ = await chat.prepare()

Out:

Ragna is an open source project built by Quansight. It is designed to allow
organizations to explore the power of Retrieval-augmented generation (RAG) based
AI tools. Ragna provides an intuitive API for quick experimentation and built-in
tools for creating production-ready applications allowing you to quickly leverage
Large Language Models (LLMs) for your work.

The Ragna website is https://ragna.chat/. The source code is available at
https://github.com/Quansight/ragna under the BSD 3-Clause license.

Set the stream=True flag when calling ragna.core.Chat.answer

message = await chat.answer("What is Ragna?", stream=True)

At this stage, we cannot access the content of the message, e.g. by printing it.

try:
    print(message)
except Exception as error:
    print(f"{type(error).__name__}: {error}")

Out:

RuntimeError: Message content cannot be accessed without having iterated over it, e.g. `async for chunk in message`, or reading the content, e.g. `await message.read()`, first.

To get the individual chunks, we asynchronously iterate over the message.

chunks = [chunk async for chunk in message]

print(len(chunks))
print(chunks)

Out:

55
["I'm ", 'a ', 'demo ', 'assistant ', 'and ', 'can ', 'be ', 'used ', 'to ', 'try ', "Ragna's ", 'workflow.\nI ', 'will ', 'only ', 'mirror ', 'back ', 'my ', 'inputs. ', '\n\nSo ', 'far ', 'I ', 'have ', 'received ', '1 ', 'messages.\n\nYour ', 'last ', 'prompt ', 'was:\n\n> ', 'What ', 'is ', 'Ragna?\n\nThese ', 'are ', 'the ', 'sources ', 'I ', 'was ', 'given:\n\n- ', 'ragna.txt: ', 'Ragna ', 'is ', 'an ', 'open ', 'source ', 'project ', 'built ', 'by ', 'Quansight. ', 'It ', 'is ', 'designed ', 'to ', 'allow ', 'organizations ', 'to ', '[...] ']

Joining the chunks together results in the full message.

print("".join(chunks))

Out:

I'm a demo assistant and can be used to try Ragna's workflow.
I will only mirror back my inputs. 

So far I have received 1 messages.

Your last prompt was:

> What is Ragna?

These are the sources I was given:

- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...] 

REST API

from ragna.deploy import Config

config = Config(assistants=[DemoStreamingAssistant])

ragna_deploy = ragna_docs.RagnaDeploy(config)

client, document = ragna_deploy.get_http_client(
    authenticate=True, upload_sample_document=True
)

Start and prepare the chat

chat = (
    client.post(
        "/api/chats",
        json={
            "name": "Tutorial REST API",
            "input": [document["id"]],
            "source_storage": source_storages.RagnaDemoSourceStorage.display_name(),
            "assistant": DemoStreamingAssistant.display_name(),
        },
    )
    .raise_for_status()
    .json()
)

client.post(f"/api/chats/{chat['id']}/prepare").raise_for_status()

Out:

<Response [200 OK]>

Streaming the response is performed with JSONL. Each line in the response is valid JSON and corresponds to one chunk.

import json


with client.stream(
    "POST",
    f"/api/chats/{chat['id']}/answer",
    json={"prompt": "What is Ragna?", "stream": True},
) as response:
    chunks = [json.loads(data) for data in response.iter_lines()]

The first chunk contains the full message object including the sources along the first chunk of the content.

print(len(chunks))
print(json.dumps(chunks[0], indent=2))

Out:

55
{
  "id": "8f30f6df-f79c-4032-b830-37811d01fdfe",
  "content": "I'm ",
  "role": "assistant",
  "sources": [
    {
      "id": "5ba07b80-9422-4ffe-b62b-67dfa15a9c63",
      "document_id": "7a43b7fd-a48d-4edc-805f-2066facb67df",
      "document_name": "ragna.txt",
      "location": "",
      "content": "Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]",
      "num_tokens": 17
    }
  ],
  "timestamp": "2025-01-16T10:14:14.622337Z"
}

Subsequent chunks no longer contain the sources.

print(json.dumps(chunks[1], indent=2))

Out:

{
  "id": "8f30f6df-f79c-4032-b830-37811d01fdfe",
  "content": "a ",
  "role": "assistant",
  "sources": null,
  "timestamp": "2025-01-16T10:14:14.622337Z"
}

Joining the content of the chunks together results in the full message.

print("".join(chunk["content"] for chunk in chunks))

Out:

I'm a demo assistant and can be used to try Ragna's workflow.
I will only mirror back my inputs. 

So far I have received 1 messages.

Your last prompt was:

> What is Ragna?

These are the sources I was given:

- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...] 

Before we close the example, let's terminate the REST API and have a look at what would have printed in the terminal if we had started it with the ragna deploy command.

ragna_deploy.terminate()

Out:

INFO:     127.0.0.1:54612 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:54614 - "GET /login HTTP/1.1" 303 See Other
INFO:     127.0.0.1:54614 - "GET /oauth-callback HTTP/1.1" 303 See Other
INFO:     127.0.0.1:54614 - "GET / HTTP/1.1" 303 See Other
INFO:     127.0.0.1:54614 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:54614 - "POST /api/documents HTTP/1.1" 200 OK
INFO:     127.0.0.1:54614 - "PUT /api/documents HTTP/1.1" 200 OK
INFO:     127.0.0.1:54614 - "POST /api/chats HTTP/1.1" 200 OK
INFO:     127.0.0.1:54614 - "POST /api/chats/5fade345-dcfc-43ab-bf31-6e7319fd528b/prepare HTTP/1.1" 200 OK
INFO:     127.0.0.1:54614 - "POST /api/chats/5fade345-dcfc-43ab-bf31-6e7319fd528b/answer HTTP/1.1" 200 OK

Total running time of the script: ( 0 minutes 3.685 seconds)

Download Python source code: gallery_streaming.py

Download Jupyter notebook: gallery_streaming.ipynb

Gallery generated by mkdocs-gallery