Streaming OpenAI in Elixir Phoenix
- Date
At Axflow, we are building open source tooling to help developers integrate large language models into their products. The majority of this tooling thus far has been written in TypeScript, though we’ve been exploring replacing part of our stack with a Phoenix application. We spend quite a bit of time working with OpenAI and given LLMs are all the rage, I wanted to write about how we’re integrating with it in our Phoenix application.
This post is the first in a series of posts about integrating with OpenAI’s chat completions endpoint.
- Part I (this post) implements a module for streaming chat completions and a Phoenix API endpoint to stream the completions to clients.
- Part II goes deeper on stream parsing and introduces state for more robust handling of HTTP streams.
- Part III uses Phoenix LiveView to stream completions to users connected to your site.
Environment
If you follow along you’ll need a Phoenix application and an OpenAI API key. If you don’t already have a Phoenix application, you can follow the up and running guide.
Calling OpenAI without streaming
To start simple and work our way up, let’s first write code to call OpenAI’s chat completion endpoint without streaming the response.
Let’s add Req to the list of dependencies as we’ll need an HTTP client.
{:req, "~> 0.4.0"}
Next, create a module named MyApp.Openai
at lib/my_app/openai.ex
. The initial requirements are:
- Be able to perform HTTP requests to OpenAI
- Ensure our HTTP requests are properly authenticated by their API
- Parse the JSON response into Elixir objects
Req will automatically parse (non-streaming) JSON responses, leaving us to implement only the first two.
defmodule MyApp.Openai do
@chat_completions_url "https://api.openai.com/v1/chat/completions"
def chat_completion(request) do
Req.post(@chat_completions_url, json: request)
end
end
This code will call OpenAI’s chat completion endpoint, but without authentication the request will fail.
OpenAI expects your API key in the Authorization
header to authenticate to their API.
Before adding the API key to the request, the app must be configured with the key. In my case, I read the key from an environment variable in config/runtime.exs
.
config :my_app, :openai, api_key: System.get_env("OPENAI_API_KEY")
Now we need to tell Req to add this key to the request. Req even has some sugar for the Authorization
header.
defmodule MyApp.Openai do
@chat_completions_url "https://api.openai.com/v1/chat/completions"
def chat_completion(request) do
Req.post(@chat_completions_url,
json: request,
auth: {:bearer, api_key()}
)
end
defp api_key() do
Application.get_env(:my_app, :openai)[:api_key]
end
end
We can open up a shell with iex -S mix
to try it out:
iex> {:ok, %{body: response}} = MyApp.Openai.chat_completion(%{ model: "gpt-3.5-turbo", messages: [%{role: "user", content: "Hello 3.5!"}] })
iex> response
%{
"choices" => [
%{
"finish_reason" => "stop",
"index" => 0,
"message" => %{
"content" => "Hello! How can I assist you today?",
"role" => "assistant"
}
}
],
# ...
}
}
Streaming the response
OpenAI’s API supports streaming responses. To stream the response, we need to set stream
to true
in the request body.
We can handle streams in Req a few ways, but here we’ll use the callback function.
Let’s add another clause for the chat_completion
function that takes two arguments. chat_completion/2
will be defined as:
def chat_completion(request, callback) do
Req.post(@chat_completions_url,
json: request,
auth: {:bearer, api_key()},
into: fn {:data, data}, context ->
callback.(data)
{:cont, context}
end
)
end
If we run this, setting stream
to true
in the request, we’ll see the response data as raw server sent events.
MyApp.Openai.chat_completion(
%{
model: "gpt-3.5-turbo",
stream: true,
messages: [%{role: "user", content: "Hello 3.5!"}]
},
&IO.puts/1
)
Notice that the format for each event is data: <json-encoded data>\n\n
, except the last which is always data: [DONE]\n\n
. For example:
data: {"id":"chatcmpl-8UmwRPfWQVApzdIPgAtpcF3RnpeEr","object":"chat.completion.chunk","created":1702348663,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
Note that some chunks in the streamed response contain multiple of these events, while others may only have one. We must keep this in mind when parsing.
Parsing the events into Elixir data structures
When we stream the response, we need to handle parsing the data ourselves. Thanks to Elixir, we don’t have to differentiate between raw bytes and language-level strings. We can also rest assured that Elixir will properly handle unicode, encodings, graphemes, etc. Parsing these events is therefore a simple string parsing exercise.
Given the following streaming input
data: {"json":"object"}
data: {"json":"object 2"}
data: {"json":"object N"}
data: [DONE]
- Strip
data:
from the chunk - Remove the trailing newlines
- Extract and parse the JSON object
This is trivial to express in Elixir:
defp parse(chunk) do
chunk
|> String.split("data: ")
|> Enum.map(&String.trim/1)
|> Enum.map(&decode/1)
|> Enum.reject(&is_nil/1)
end
defp decode(""), do: nil
defp decode("[DONE]"), do: nil
defp decode(data), do: Jason.decode!(data)
The decode/1
function will receive some empty strings as a result of the string splitting, so we must ignore those. It must also ignore the final [DONE]
event.
Otherwise, we return the decoded JSON. Once parse/1
rejects the nil
values, we’re left with only the decoded JSON.
Lastly, we need to update chat_completions/2
to make use of parse/1
. Remove the line that invokes the callback and replace it with the following:
def chat_completion(request, callback) do
Req.post(@chat_completions_url,
json: set_stream(request, true),
auth: {:bearer, api_key()},
into: fn {:data, data}, acc ->
- callback.(data)
+ Enum.each(parse(data), callback)
{:cont, acc}
end
)
end
If we run our chat_completion/2
function with a callback that prints its argument, we’ll see each chunk of the stream as an Elixir map!
Improving the interface
Before moving on, one thing we should do to make our code robust is to ensure its used correctly.
We implemented two functions, chat_completion/1
and chat_completion/2
, the first does not support streaming while the second expects it.
Rather than ask callers to remember this, let’s remove the stream
property from being a responsibility of the caller.
We can add a function to ensure the request body’s stream
property is set to the expected value in all cases.
def chat_completion(request) do
Req.post(@chat_completions_url,
- json: request,
+ json: set_stream(request, false),
auth: {:bearer, api_key()}
)
end
def chat_completion(request, callback) do
Req.post(@chat_completions_url,
- json: request,
+ json: set_stream(request, true),
auth: {:bearer, api_key()},
into: fn {:data, data}, acc ->
Enum.each(parse(data), callback)
{:cont, acc}
end
)
end
+ defp set_stream(request, value) do
+ request
+ |> Map.drop([:stream, "stream"])
+ |> Map.put(:stream, value)
+ end
With that, our module to handle both streaming and non-streaming calls to OpenAI’s chat completion endpoint is complete! Below is the final code:
defmodule MyApp.Openai do
@chat_completions_url "https://api.openai.com/v1/chat/completions"
def chat_completion(request) do
Req.post(@chat_completions_url,
json: set_stream(request, false),
auth: {:bearer, api_key()}
)
end
def chat_completion(request, callback) do
Req.post(@chat_completions_url,
json: set_stream(request, true),
auth: {:bearer, api_key()},
into: fn {:data, data}, acc ->
Enum.each(parse(data), callback)
{:cont, acc}
end
)
end
defp set_stream(request, value) do
request
|> Map.drop([:stream, "stream"])
|> Map.put(:stream, value)
end
defp parse(chunk) do
chunk
|> String.split("data: ")
|> Enum.map(&String.trim/1)
|> Enum.map(&decode/1)
|> Enum.reject(&is_nil/1)
end
defp decode(""), do: nil
defp decode("[DONE]"), do: nil
defp decode(data), do: Jason.decode!(data)
defp api_key() do
Application.get_env(:my_app, :openai)[:api_key]
end
end
Streaming from Phoenix
As a last step, we’ll setup an API to stream these events to clients. We can use this as an exercise to map over the OpenAI stream, returning our own stream.
We’ll return newline-delimited json from our API. The POST request body for this endpoint is expected to have a single key named request
that contains any values supported by OpenAI’s chat completion request.
Create lib/my_app_web/controllers/chat_controller.ex
and add the following code.
defmodule MyAppWeb.ChatController do
use MyAppWeb, :controller
@nd_json_content_type "application/x-ndjson"
def stream(conn, %{"request" => request}) do
conn =
conn
|> put_resp_content_type(@nd_json_content_type)
|> send_chunked(200)
MyApp.Openai.chat_completion(request, fn data ->
result = Jason.encode!(data)
chunk(conn, result)
chunk(conn, "\n")
end)
conn
end
end
Note: Any request input validation is an exercise for the reader.
The last thing we need to do is add a route to our routes file to expose this endpoint.
pipeline :api do
plug :accepts, ["json"]
end
scope "/api", MyAppWeb do
pipe_through [:api]
post "/chat", ChatController, :stream
end
Boom! We can now stream OpenAI chat completion responses through our Phoenix application using a response format that is dead simple for clients to parse.
curl -i 'http://localhost:4000/api/chat' -H "content-type: application/json" --data-raw '{"request":{"model":"gpt-3.5-turbo","temperature":1,"messages":[{"role":"user","content":"Hello 3.5!"}]}}'
HTTP/1.1 200 OK
content-type: application/x-ndjson; charset=utf-8
transfer-encoding: chunked
{"choices":[{"delta":{"content":"","role":"assistant"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":"Hello"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":"!"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" How"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" can"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" I"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" assist"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" you"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" today"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":"?"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{},"finish_reason":"stop","index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
Wrapping up
Elixir is joy to work with. Here it took less than 70 lines of code to write an OpenAI wrapper, map over the source streams, and stream the transformed result back to the client.
It’s worth noting that community-maintained OpenAI clients exist for Elixir, but like with many things in this ecosystem, I find that the language itself often provides what I need without having to reach for third-party libraries.
In the next post, we’ll dive deeper into parsing.