Streaming OpenAI in Elixir Phoenix Part II
- Date
This is part II in a series about streaming OpenAI chat completions in Elixir.
- In part I, we implement a module and API endpoint for streaming chat completions.
- Part II (this post) revisits stream parsing and why you may want stateful parsing.
- Part III uses Phoenix LiveView to stream completions to users connected to your site.
You may not need this
The previous post implements a working integration against OpenAI. In practice I have not encountered the problems this post describes and aims to solve. It may be best to stick with the simpler implementation and leave this as more of an intellectual exercise.
I did not thoroughly investigate, but I did not see these potential problems addressed in the community-maintained OpenAI Elixir packages either. Please correct me if I’m wrong!
Revisiting the parser
In the previous post, we implemented streaming event parsing using the following code:
defp parse(chunk) do
chunk
|> String.split("data: ")
|> Enum.map(&String.trim/1)
|> Enum.map(&decode/1)
|> Enum.reject(&is_nil/1)
end
defp decode(""), do: nil
defp decode("[DONE]"), do: nil
defp decode(data), do: Jason.decode!(data)
The input to parse/1
(the chunk
) was assumed to be zero or more complete events, e.g.:
data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}]}
However, what if we were to receive a partial event? That is, what if the bytes that comprise one or more of the events in the stream arrive at different times? For example, let’s take the first event from above and say it arrived in stages.
Stage 1:
data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745
Stage 2:
461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}
If this were to happen, the parser would fail to extract the events correctly as it currently assumes the entire event is present when invoking parse/1
.
HTTP buffering
Some server environments will buffer data and send the data to the client once the buffer reaches a certain size. Even when a web app’s server does not do this, it’s possible there is a proxy (or some other middleman) that sits between the source server and the client. If this is the case, it’s possible that a client reading from a stream of events will receive portions of an event at a time and thus be responsible for stringing them back together.
If we want a robust implementation that can withstand this scenario, we’ll need to introduce state into our parser.
Note: In practice, I frequently work with streaming APIs and have implemented zero-dependency clients against OpenAI, Anthropic, Cohere, Google, and more without ever experiencing this issue.
Updating our parser
Before introducing state, let’s rework the parser to remove the assumption that an entire event is present at once.
Among the greatest joys of working with Elixir is iterating over binaries (strings) and using pattern matching to extract or otherwise modify the input in some way. Whether or not we need to optimize our parser around the above scenario, reworking it in this way will be a fun exercise!
Let’s reimplement the parser to take a buffer
argument and iterate over the binary using pattern matching and recursion. We’ll add two new functions, parse/2
and parse/3
.
def parse(buffer, chunk) do
parse(buffer, chunk, [])
end
defp parse(buffer, chunk, events) do
# TODO
end
parse/2
will be the public interface for callers, while parse/3
will implement the parsing logic. The arguments are:
buffer
— This will hold the intermediate contents of a single event while we iterate over a givenchunk
. Later, when we introduce state, this buffer will persist between chunks that arrive separately in the stream.chunk
— This is a single chunk of data received from the stream of all chunks, which may contain zero or more complete or incomplete events.events
— A singlechunk
may contain more than one event, so we’ll keep a list of the events we’ve parsed and return them when parsing is complete.
defp parse([buffer | "\n"], "\n" <> rest, events) do
case IO.iodata_to_binary(buffer) do
"data: [DONE]" ->
parse([], rest, events)
"data: " <> event ->
parse([], rest, [Jason.decode!(event) | events])
end
end
defp parse(buffer, <<char::utf8, rest::binary>>, events) do
parse([buffer | <<char::utf8>>], rest, events)
end
defp parse(buffer, "", events) do
{buffer, Enum.reverse(events)}
end
That’s a lot of functionality in only a few lines of code, so let’s unpack it. At a high-level:
- The first clause matches when we have two consecutive newlines present, which is the event separator. In this case, we either add the (JSON-decoded) event to the list of events and continue or skip it if it signals the end of the stream.
- The second clause matches when the chunk is non-empty (but is also not the event separator). In this case, we extract the first utf8 character, add it to our buffer, and continue parsing the rest of the chunk.
- The third clause matches when there are no more bytes left in the chunk to parse. In this case, we return a tuple consisting of the
buffer
and the (reversed) list ofevents
. We reverse the events because we prepended them to the list during parsing as to not waste memory due to Elixir’s immutable data structures (adding an item to the head of a list is constant in time and memory).
Pattern matching is used extensively here, not only in the function heads but also in a case
statement. We destructure binaries in multiple places using declarative syntax. Modifiers like ::utf8
are used to ensure
that we don’t naively destructure a subset of a single character but entire characters at a time (since utf8 is variable-width). For more on this, I recommend the Elixir guides
and The Absolute Minimum Every Software Developer Must Know About Unicode in 2023.
Lastly, we use iodata as our buffer for efficiency. Naively, we could have used a string as our buffer and appended each utf8 character to it as we went. However, doing so would result in an explosion in memory consumption because each append operation would copy the entire string and create a new one with the new character appended. Instead, we add each character to a nested list with one call at the end to convert it into a string.
An aside on binary pattern matching
I’ve written many low-level stream parsers in TypeScript using TransformStreams
and Uint8Arrays
and I can confidently say it is SO MUCH MORE enjoyable to do this in Elixir.
This is one of those simple ideas which after you have seen it makes you wonder how any language could be without it.
— Joe Armstrong on binary pattern matching and the bit syntax in A History of Erlang
A quick test
To make this concrete and ensure it’s working, let’s add a quick unit test. Create test/my_app/openai_test.exs
and add the following code.
defmodule MyApp.OpenaiTest do
use MyApp.DataCase
alias MyApp.Openai
test "can parse complete chunks" do
event_one = %{
"choices" => [
%{
"delta" => %{"content" => "Hello"},
"finish_reason" => nil,
"index" => 0,
"logprobs" => nil
}
],
"created" => 1_704_745_461,
"id" => "chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08",
"model" => "gpt-3.5-turbo-0613",
"object" => "chat.completion.chunk",
"system_fingerprint" => nil
}
event_two = %{
"choices" => [
%{"delta" => %{"content" => "!"}, "finish_reason" => nil, "index" => 0, "logprobs" => nil}
],
"created" => 1_704_745_461,
"id" => "chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08",
"model" => "gpt-3.5-turbo-0613",
"object" => "chat.completion.chunk",
"system_fingerprint" => nil
}
chunk = """
data: #{Jason.encode!(event_one)}
data: #{Jason.encode!(event_two)}
data: [DONE]
"""
assert {[], [^event_one, ^event_two]} = Openai.parse([], chunk)
end
end
Hopefully this example clarifies the expected inputs and outputs to parse/2
.
Swapping out parse/1
At this point, we can replace the previous parsing logic, parse/1
, with parse/2
.
def chat_completion(request, callback) do
Req.post(@chat_completions_url,
json: set_stream(request, true),
auth: {:bearer, api_key()},
into: fn {:data, data}, acc ->
- Enum.each(parse(data), callback)
+ {_buffer, events} = parse([], data)
+ Enum.each(events, callback)
{:cont, acc}
end
)
end
- defp parse(chunk) do
- chunk
- |> String.split("data: ")
- |> Enum.map(&String.trim/1)
- |> Enum.map(&decode/1)
- |> Enum.reject(&is_nil/1)
- end
-
- defp decode(""), do: nil
- defp decode("[DONE]"), do: nil
- defp decode(data), do: Jason.decode!(data)
Introducing state
It’s time to address the problem we described above: If a streaming chunk arrives containing an incomplete event, our previous parsing logic will fail. Our new parsing logic will also fail as it is currently written, but we can change that by introducing state.
Before we continue, let’s add a test that makes it clear where state is needed.
test "can parse incomplete chunks" do
chunk_one =
"""
data: {"id":"chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08","object":"chat.completion.chunk","created":1704745
"""
|> String.trim()
chunk_two = """
461,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}
"""
expected_event = %{
"choices" => [
%{
"delta" => %{"content" => "Hello"},
"finish_reason" => nil,
"index" => 0,
"logprobs" => nil
}
],
"created" => 1_704_745_461,
"id" => "chatcmpl-8eqSTjwUmipuvL2s8CzjmXd1dTS08",
"model" => "gpt-3.5-turbo-0613",
"object" => "chat.completion.chunk",
"system_fingerprint" => nil
}
assert {buffer, []} = Openai.parse([], chunk_one)
assert {[], [^expected_event]} = Openai.parse(buffer, chunk_two)
end
In this test case, notice how it takes two calls to parse/2
in order to construct a single event, with the buffer
from the first call passed to the second.
This is where we’ll need to keep state—between calls to parse/2
.
Admittedly, this is where Elixir can feel more difficult than a language like TypeScript, Go, etc. Since Elixir’s data is immutable, we cannot simply update a variable or modify an existing data structure. To solve for this, Elixir—more accurately, the underlying virtual machine—has a notion of processes, and processes are stateful. While this feels like friction at first, processes are really useful in ways beyond state and thus we often gain other benefits when leveraging them (e.g., concurrency, fault tolerance).
Thankfully, we do not have to deal with most of the mechanics of processes because Elixir provides a built-in abstraction around state called agents. Let’s update chat_completion/2
to use an agent.
def chat_completion(request, callback) do
# Initialize buffer state
{:ok, agent} = Agent.start_link(fn -> [] end)
response =
Req.post(@chat_completions_url,
json: set_stream(request, true),
auth: {:bearer, api_key()},
into: fn {:data, data}, acc ->
# Get previous buffer value
buffer = Agent.get(agent, & &1)
{buffer, events} = parse(buffer, data)
Enum.each(events, callback)
# Update buffer value with the result from calling parse/2
:ok = Agent.update(agent, fn _ -> buffer end)
{:cont, acc}
end
)
# Make sure we shut the agent down
:ok = Agent.stop(agent)
response
end
The agent preserves the buffer between the arrival of different chunks. With that, our parser is now stateful and can handle chunks containing incomplete events.
Conclusion
Parsing binaries in Elixir is some of my favorite code to write (I wrote a parser for the KDL language largely for this reason). Perhaps that’s because of all the times I’ve had to do it without the luxury of pattern matching and the bit syntax.
The final part in this series takes our streaming chat completions and renders them in a UI using LiveView.