Note
Click here to download the full example code
Streaming messages
Ragna supports streaming responses from the assistant. This example showcases how this is performed using the Python and REST API.
Tip
Of the assistants that Ragna has built in, the following ones support streaming:
Setup streaming assistant
To be able to stream a message from an assistant, it needs to support streaming. For this example, we subclass the ragna.assistants.RagnaDemoAssistant, split its message on whitespace, and return the individual chunks.
from ragna import assistants
class DemoStreamingAssistant(assistants.RagnaDemoAssistant):
def answer(self, messages):
content = next(super().answer(messages))
for chunk in content.split(" "):
yield f"{chunk} "
Python API
Let's create and prepare a chat using the assistant we have defined above.
from pathlib import Path
import ragna._docs as ragna_docs
from ragna import Rag, source_storages
print(ragna_docs.SAMPLE_CONTENT)
document_path = Path.cwd() / "ragna.txt"
with open(document_path, "w") as file:
file.write(ragna_docs.SAMPLE_CONTENT)
chat = Rag().chat(
input=[document_path],
source_storage=source_storages.RagnaDemoSourceStorage,
assistant=DemoStreamingAssistant,
)
_ = await chat.prepare()
Out:
Ragna is an open source project built by Quansight. It is designed to allow
organizations to explore the power of Retrieval-augmented generation (RAG) based
AI tools. Ragna provides an intuitive API for quick experimentation and built-in
tools for creating production-ready applications allowing you to quickly leverage
Large Language Models (LLMs) for your work.
The Ragna website is https://ragna.chat/. The source code is available at
https://github.com/Quansight/ragna under the BSD 3-Clause license.
Set the stream=True
flag when calling ragna.core.Chat.answer
message = await chat.answer("What is Ragna?", stream=True)
At this stage, we cannot access the content of the message, e.g. by printing it.
try:
print(message)
except Exception as error:
print(f"{type(error).__name__}: {error}")
Out:
RuntimeError: Message content cannot be accessed without having iterated over it, e.g. `async for chunk in message`, or reading the content, e.g. `await message.read()`, first.
To get the individual chunks, we asynchronously iterate over the message
.
chunks = [chunk async for chunk in message]
print(len(chunks))
print(chunks)
Out:
55
["I'm ", 'a ', 'demo ', 'assistant ', 'and ', 'can ', 'be ', 'used ', 'to ', 'try ', "Ragna's ", 'workflow.\nI ', 'will ', 'only ', 'mirror ', 'back ', 'my ', 'inputs. ', '\n\nSo ', 'far ', 'I ', 'have ', 'received ', '1 ', 'messages.\n\nYour ', 'last ', 'prompt ', 'was:\n\n> ', 'What ', 'is ', 'Ragna?\n\nThese ', 'are ', 'the ', 'sources ', 'I ', 'was ', 'given:\n\n- ', 'ragna.txt: ', 'Ragna ', 'is ', 'an ', 'open ', 'source ', 'project ', 'built ', 'by ', 'Quansight. ', 'It ', 'is ', 'designed ', 'to ', 'allow ', 'organizations ', 'to ', '[...] ']
Joining the chunks together results in the full message.
print("".join(chunks))
Out:
I'm a demo assistant and can be used to try Ragna's workflow.
I will only mirror back my inputs.
So far I have received 1 messages.
Your last prompt was:
> What is Ragna?
These are the sources I was given:
- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]
REST API
from ragna.deploy import Config
config = Config(assistants=[DemoStreamingAssistant])
ragna_deploy = ragna_docs.RagnaDeploy(config)
client, document = ragna_deploy.get_http_client(
authenticate=True, upload_sample_document=True
)
Start and prepare the chat
chat = (
client.post(
"/api/chats",
json={
"name": "Tutorial REST API",
"input": [document["id"]],
"source_storage": source_storages.RagnaDemoSourceStorage.display_name(),
"assistant": DemoStreamingAssistant.display_name(),
},
)
.raise_for_status()
.json()
)
client.post(f"/api/chats/{chat['id']}/prepare").raise_for_status()
Out:
<Response [200 OK]>
Streaming the response is performed with JSONL. Each line in the response is valid JSON and corresponds to one chunk.
import json
with client.stream(
"POST",
f"/api/chats/{chat['id']}/answer",
json={"prompt": "What is Ragna?", "stream": True},
) as response:
chunks = [json.loads(data) for data in response.iter_lines()]
The first chunk contains the full message object including the sources along the first chunk of the content.
print(len(chunks))
print(json.dumps(chunks[0], indent=2))
Out:
55
{
"id": "8f30f6df-f79c-4032-b830-37811d01fdfe",
"content": "I'm ",
"role": "assistant",
"sources": [
{
"id": "5ba07b80-9422-4ffe-b62b-67dfa15a9c63",
"document_id": "7a43b7fd-a48d-4edc-805f-2066facb67df",
"document_name": "ragna.txt",
"location": "",
"content": "Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]",
"num_tokens": 17
}
],
"timestamp": "2025-01-16T10:14:14.622337Z"
}
Subsequent chunks no longer contain the sources.
print(json.dumps(chunks[1], indent=2))
Out:
{
"id": "8f30f6df-f79c-4032-b830-37811d01fdfe",
"content": "a ",
"role": "assistant",
"sources": null,
"timestamp": "2025-01-16T10:14:14.622337Z"
}
Joining the content of the chunks together results in the full message.
print("".join(chunk["content"] for chunk in chunks))
Out:
I'm a demo assistant and can be used to try Ragna's workflow.
I will only mirror back my inputs.
So far I have received 1 messages.
Your last prompt was:
> What is Ragna?
These are the sources I was given:
- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]
Before we close the example, let's terminate the REST API and have a look at what
would have printed in the terminal if we had started it with the ragna deploy
command.
ragna_deploy.terminate()
Out:
INFO: 127.0.0.1:54612 - "GET /health HTTP/1.1" 200 OK
INFO: 127.0.0.1:54614 - "GET /login HTTP/1.1" 303 See Other
INFO: 127.0.0.1:54614 - "GET /oauth-callback HTTP/1.1" 303 See Other
INFO: 127.0.0.1:54614 - "GET / HTTP/1.1" 303 See Other
INFO: 127.0.0.1:54614 - "GET /docs HTTP/1.1" 200 OK
INFO: 127.0.0.1:54614 - "POST /api/documents HTTP/1.1" 200 OK
INFO: 127.0.0.1:54614 - "PUT /api/documents HTTP/1.1" 200 OK
INFO: 127.0.0.1:54614 - "POST /api/chats HTTP/1.1" 200 OK
INFO: 127.0.0.1:54614 - "POST /api/chats/5fade345-dcfc-43ab-bf31-6e7319fd528b/prepare HTTP/1.1" 200 OK
INFO: 127.0.0.1:54614 - "POST /api/chats/5fade345-dcfc-43ab-bf31-6e7319fd528b/answer HTTP/1.1" 200 OK
Total running time of the script: ( 0 minutes 3.685 seconds)
Download Python source code: gallery_streaming.py