Note
Click here to download the full example code
Streaming messages
Ragna supports streaming responses from the assistant. This example showcases how this is performed using the Python and REST API.
Before we start this example, we import some helpers.
import sys
from pathlib import Path
sys.path.insert(0, str(Path.cwd().parent))
import documentation_helpers
Setup streaming assistant
To be able to stream a message from an assistant, it needs to support streaming. For this example, we subclass the ragna.assistants.RagnaDemoAssistant, split its message on whitespace, and return the individual chunks.
Tip
Of the assistants that Ragna has built in, the following ones support streaming:
from ragna import assistants
class DemoStreamingAssistant(assistants.RagnaDemoAssistant):
def answer(self, prompt, sources):
content = next(super().answer(prompt, sources))
for chunk in content.split(" "):
yield f"{chunk} "
Python API
Let's create and prepare a chat using the assistant we have defined above.
from ragna import Rag, source_storages
document_path = documentation_helpers.assets / "ragna.txt"
chat = Rag().chat(
documents=[document_path],
source_storage=source_storages.RagnaDemoSourceStorage,
assistant=DemoStreamingAssistant,
)
_ = await chat.prepare()
message = await chat.answer("What is Ragna?", stream=True)
At this stage, we cannot access the content of the message, e.g. by printing it.
try:
print(message)
except Exception as error:
print(f"{type(error).__name__}: {error}")
Out:
RuntimeError: Message content cannot be accessed without having iterated over it, e.g. `async for chunk in message`, or reading the content, e.g. `await message.read()`, first.
To get the individual chunks, we asynchronously iterate over the message
.
chunks = [chunk async for chunk in message]
print(len(chunks))
print(chunks)
Out:
48
["I'm ", 'a ', 'demo ', 'assistant ', 'and ', 'can ', 'be ', 'used ', 'to ', 'try ', 'Ragnas ', 'workflow.\nI ', 'will ', 'only ', 'mirror ', 'back ', 'my ', 'inputs. ', '\n\nYour ', 'prompt ', 'was:\n\n> ', 'What ', 'is ', 'Ragna?\n\nThese ', 'are ', 'the ', 'sources ', 'I ', 'was ', 'given:\n\n- ', 'ragna.txt: ', 'Ragna ', 'is ', 'an ', 'open ', 'source ', 'project ', 'built ', 'by ', 'Quansight. ', 'It ', 'is ', 'designed ', 'to ', 'allow ', 'organizations ', 'to ', '[...] ']
Joining the chunks together results in the full message.
print("".join(chunks))
Out:
I'm a demo assistant and can be used to try Ragnas workflow.
I will only mirror back my inputs.
Your prompt was:
> What is Ragna?
These are the sources I was given:
- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]
REST API
from ragna.deploy import Config
config = Config(assistants=[DemoStreamingAssistant])
rest_api = documentation_helpers.RestApi()
client = rest_api.start(config, authenticate=True)
Out:
Starting Ragna REST API
...
Upload the document.
document_upload = (
client.post("/document", json={"name": document_path.name})
.raise_for_status()
.json()
)
document = document_upload["document"]
parameters = document_upload["parameters"]
client.request(
parameters["method"],
parameters["url"],
data=parameters["data"],
files={"file": open(document_path, "rb")},
).raise_for_status()
Out:
<Response [200 OK]>
Start and prepare the chat
chat = (
client.post(
"/chats",
json={
"name": "Tutorial REST API",
"documents": [document],
"source_storage": source_storages.RagnaDemoSourceStorage.display_name(),
"assistant": DemoStreamingAssistant.display_name(),
"params": {},
},
)
.raise_for_status()
.json()
)
client.post(f"/chats/{chat['id']}/prepare").raise_for_status()
Out:
<Response [200 OK]>
Streaming the response is performed with JSONL. Each line in the response is valid JSON and corresponds to one chunk.
import json
with client.stream(
"POST",
f"/chats/{chat['id']}/answer",
json={"prompt": "What is Ragna?", "stream": True},
) as response:
chunks = [json.loads(data) for data in response.iter_lines()]
The first chunk contains the full message object including the sources along the first chunk of the content.
print(len(chunks))
print(json.dumps(chunks[0], indent=2))
Out:
48
{
"id": "4d49e1af-7d2f-4500-8099-daf5c8deec51",
"content": "I'm ",
"role": "assistant",
"sources": [
{
"id": "f86a79b4-50d5-40c7-86f7-4c8a832b09bb",
"document": {
"id": "9b229b09-f17b-4082-ba21-e53d096eb940",
"name": "ragna.txt"
},
"location": "",
"content": "Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]",
"num_tokens": 17
}
],
"timestamp": "2024-04-21T13:59:53.827535"
}
Subsequent chunks no longer contain the sources.
print(json.dumps(chunks[1], indent=2))
Out:
{
"id": "4d49e1af-7d2f-4500-8099-daf5c8deec51",
"content": "a ",
"role": "assistant",
"sources": null,
"timestamp": "2024-04-21T13:59:53.827535"
}
Joining the content of the chunks together results in the full message.
print("".join(chunk["content"] for chunk in chunks))
Out:
I'm a demo assistant and can be used to try Ragnas workflow.
I will only mirror back my inputs.
Your prompt was:
> What is Ragna?
These are the sources I was given:
- ragna.txt: Ragna is an open source project built by Quansight. It is designed to allow organizations to [...]
Before we close the example, let's stop the REST API and have a look at what would have printed in the terminal if we had started it the regular way.
rest_api.stop()
Out:
INFO: RagnaDemoAuthentication: You can log in with any username and a matching
password.
INFO: 127.0.0.1:40104 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:40104 - "POST /token HTTP/1.1" 200 OK
INFO: 127.0.0.1:40104 - "POST /document HTTP/1.1" 200 OK
INFO: 127.0.0.1:40104 - "PUT /document HTTP/1.1" 200 OK
INFO: 127.0.0.1:40104 - "POST /chats HTTP/1.1" 200 OK
INFO: 127.0.0.1:40104 - "POST /chats/c163adea-b05e-4fc4-a160-6554eefebb1a/prepare HTTP/1.1" 200 OK
INFO: 127.0.0.1:40104 - "POST /chats/c163adea-b05e-4fc4-a160-6554eefebb1a/answer HTTP/1.1" 200 OK
Total running time of the script: ( 0 minutes 4.714 seconds)
Download Python source code: gallery_streaming.py