Streaming OpenAI in Elixir Phoenix

Date

At Axflow, we are building open source tooling to help developers integrate large language models into their products. The majority of this tooling thus far has been written in TypeScript, though we’ve been exploring replacing part of our stack with a Phoenix application. We spend quite a bit of time working with OpenAI and given LLMs are all the rage, I wanted to write about how we’re integrating with it in our Phoenix application.

This post is the first in a series of posts about integrating with OpenAI’s chat completions endpoint.

  1. Part I (this post) implements a module for streaming chat completions and a Phoenix API endpoint to stream the completions to clients.
  2. Part II goes deeper on stream parsing and introduces state for more robust handling of HTTP streams.
  3. Part III uses Phoenix LiveView to stream completions to users connected to your site.

Environment

If you follow along you’ll need a Phoenix application and an OpenAI API key. If you don’t already have a Phoenix application, you can follow the up and running guide.

Calling OpenAI without streaming

To start simple and work our way up, let’s first write code to call OpenAI’s chat completion endpoint without streaming the response.

Let’s add Req to the list of dependencies as we’ll need an HTTP client.

{:req, "~> 0.4.0"}

Next, create a module named MyApp.Openai at lib/my_app/openai.ex. The initial requirements are:

  1. Be able to perform HTTP requests to OpenAI
  2. Ensure our HTTP requests are properly authenticated by their API
  3. Parse the JSON response into Elixir objects

Req will automatically parse (non-streaming) JSON responses, leaving us to implement only the first two.

defmodule MyApp.Openai do
  @chat_completions_url "https://api.openai.com/v1/chat/completions"

  def chat_completion(request) do
    Req.post(@chat_completions_url, json: request)
  end
end

This code will call OpenAI’s chat completion endpoint, but without authentication the request will fail.

OpenAI expects your API key in the Authorization header to authenticate to their API. Before adding the API key to the request, the app must be configured with the key. In my case, I read the key from an environment variable in config/runtime.exs.

config :my_app, :openai, api_key: System.get_env("OPENAI_API_KEY")

Now we need to tell Req to add this key to the request. Req even has some sugar for the Authorization header.

defmodule MyApp.Openai do
  @chat_completions_url "https://api.openai.com/v1/chat/completions"

  def chat_completion(request) do
    Req.post(@chat_completions_url,
      json: request,
      auth: {:bearer, api_key()}
    )
  end

  defp api_key() do
    Application.get_env(:my_app, :openai)[:api_key]
  end
end

We can open up a shell with iex -S mix to try it out:

iex> {:ok, %{body: response}} = MyApp.Openai.chat_completion(%{ model: "gpt-3.5-turbo", messages: [%{role: "user", content: "Hello 3.5!"}] })
iex> response
%{
  "choices" => [
    %{
      "finish_reason" => "stop",
      "index" => 0,
      "message" => %{
        "content" => "Hello! How can I assist you today?",
        "role" => "assistant"
      }
    }
  ],
  # ...
  }
}

Streaming the response

OpenAI’s API supports streaming responses. To stream the response, we need to set stream to true in the request body.

We can handle streams in Req a few ways, but here we’ll use the callback function. Let’s add another clause for the chat_completion function that takes two arguments. chat_completion/2 will be defined as:

def chat_completion(request, callback) do
  Req.post(@chat_completions_url,
    json: request,
    auth: {:bearer, api_key()},
    into: fn {:data, data}, context ->
      callback.(data)
      {:cont, context}
    end
  )
end

If we run this, setting stream to true in the request, we’ll see the response data as raw server sent events.

MyApp.Openai.chat_completion(
  %{
    model: "gpt-3.5-turbo",
    stream: true,
    messages: [%{role: "user", content: "Hello 3.5!"}]
  },
  &IO.puts/1
)

Notice that the format for each event is data: <json-encoded data>\n\n, except the last which is always data: [DONE]\n\n. For example:

data: {"id":"chatcmpl-8UmwRPfWQVApzdIPgAtpcF3RnpeEr","object":"chat.completion.chunk","created":1702348663,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

Note that some chunks in the streamed response contain multiple of these events, while others may only have one. We must keep this in mind when parsing.

Parsing the events into Elixir data structures

When we stream the response, we need to handle parsing the data ourselves. Thanks to Elixir, we don’t have to differentiate between raw bytes and language-level strings. We can also rest assured that Elixir will properly handle unicode, encodings, graphemes, etc. Parsing these events is therefore a simple string parsing exercise.

Given the following streaming input

data: {"json":"object"}

data: {"json":"object 2"}

data: {"json":"object N"}

data: [DONE]

  1. Strip data: from the chunk
  2. Remove the trailing newlines
  3. Extract and parse the JSON object

This is trivial to express in Elixir:

defp parse(chunk) do
  chunk
  |> String.split("data: ")
  |> Enum.map(&String.trim/1)
  |> Enum.map(&decode/1)
  |> Enum.reject(&is_nil/1)
end

defp decode(""), do: nil
defp decode("[DONE]"), do: nil
defp decode(data), do: Jason.decode!(data)

The decode/1 function will receive some empty strings as a result of the string splitting, so we must ignore those. It must also ignore the final [DONE] event. Otherwise, we return the decoded JSON. Once parse/1 rejects the nil values, we’re left with only the decoded JSON.

Lastly, we need to update chat_completions/2 to make use of parse/1. Remove the line that invokes the callback and replace it with the following:

 def chat_completion(request, callback) do
   Req.post(@chat_completions_url,
     json: set_stream(request, true),
     auth: {:bearer, api_key()},
     into: fn {:data, data}, acc ->
-      callback.(data)
+      Enum.each(parse(data), callback)
       {:cont, acc}
     end
   )
 end

If we run our chat_completion/2 function with a callback that prints its argument, we’ll see each chunk of the stream as an Elixir map!

Improving the interface

Before moving on, one thing we should do to make our code robust is to ensure its used correctly. We implemented two functions, chat_completion/1 and chat_completion/2, the first does not support streaming while the second expects it. Rather than ask callers to remember this, let’s remove the stream property from being a responsibility of the caller. We can add a function to ensure the request body’s stream property is set to the expected value in all cases.

  def chat_completion(request) do
    Req.post(@chat_completions_url,
-     json: request,
+     json: set_stream(request, false),
      auth: {:bearer, api_key()}
    )
  end

  def chat_completion(request, callback) do
    Req.post(@chat_completions_url,
-     json: request,
+     json: set_stream(request, true),
      auth: {:bearer, api_key()},
      into: fn {:data, data}, acc ->
        Enum.each(parse(data), callback)
        {:cont, acc}
      end
    )
  end

+ defp set_stream(request, value) do
+   request
+   |> Map.drop([:stream, "stream"])
+   |> Map.put(:stream, value)
+ end

With that, our module to handle both streaming and non-streaming calls to OpenAI’s chat completion endpoint is complete! Below is the final code:

defmodule MyApp.Openai do
  @chat_completions_url "https://api.openai.com/v1/chat/completions"

  def chat_completion(request) do
    Req.post(@chat_completions_url,
      json: set_stream(request, false),
      auth: {:bearer, api_key()}
    )
  end

  def chat_completion(request, callback) do
    Req.post(@chat_completions_url,
      json: set_stream(request, true),
      auth: {:bearer, api_key()},
      into: fn {:data, data}, acc ->
        Enum.each(parse(data), callback)
        {:cont, acc}
      end
    )
  end

  defp set_stream(request, value) do
    request
    |> Map.drop([:stream, "stream"])
    |> Map.put(:stream, value)
  end

  defp parse(chunk) do
    chunk
    |> String.split("data: ")
    |> Enum.map(&String.trim/1)
    |> Enum.map(&decode/1)
    |> Enum.reject(&is_nil/1)
  end

  defp decode(""), do: nil
  defp decode("[DONE]"), do: nil
  defp decode(data), do: Jason.decode!(data)

  defp api_key() do
    Application.get_env(:my_app, :openai)[:api_key]
  end
end

Streaming from Phoenix

As a last step, we’ll setup an API to stream these events to clients. We can use this as an exercise to map over the OpenAI stream, returning our own stream. We’ll return newline-delimited json from our API. The POST request body for this endpoint is expected to have a single key named request that contains any values supported by OpenAI’s chat completion request.

Create lib/my_app_web/controllers/chat_controller.ex and add the following code.

defmodule MyAppWeb.ChatController do
  use MyAppWeb, :controller

  @nd_json_content_type "application/x-ndjson"

  def stream(conn, %{"request" => request}) do
    conn =
      conn
      |> put_resp_content_type(@nd_json_content_type)
      |> send_chunked(200)

    MyApp.Openai.chat_completion(request, fn data ->
      result = Jason.encode!(data)
      chunk(conn, result)
      chunk(conn, "\n")
    end)

    conn
  end
end

Note: Any request input validation is an exercise for the reader.

The last thing we need to do is add a route to our routes file to expose this endpoint.

pipeline :api do
  plug :accepts, ["json"]
end

scope "/api", MyAppWeb do
  pipe_through [:api]

  post "/chat", ChatController, :stream
end

Boom! We can now stream OpenAI chat completion responses through our Phoenix application using a response format that is dead simple for clients to parse.

curl -i 'http://localhost:4000/api/chat' -H "content-type: application/json" --data-raw '{"request":{"model":"gpt-3.5-turbo","temperature":1,"messages":[{"role":"user","content":"Hello 3.5!"}]}}'
HTTP/1.1 200 OK
content-type: application/x-ndjson; charset=utf-8
transfer-encoding: chunked

{"choices":[{"delta":{"content":"","role":"assistant"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":"Hello"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":"!"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" How"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" can"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" I"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" assist"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" you"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":" today"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{"content":"?"},"finish_reason":null,"index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}
{"choices":[{"delta":{},"finish_reason":"stop","index":0,"logprobs":null}],"created":1704667517,"id":"chatcmpl-8eWBJyuPK2Hyq5COCNbfR02BuYoUj","model":"gpt-3.5-turbo-0613","object":"chat.completion.chunk","system_fingerprint":null}

Wrapping up

Elixir is joy to work with. Here it took less than 70 lines of code to write an OpenAI wrapper, map over the source streams, and stream the transformed result back to the client.

It’s worth noting that community-maintained OpenAI clients exist for Elixir, but like with many things in this ecosystem, I find that the language itself often provides what I need without having to reach for third-party libraries.

In the next post, we’ll dive deeper into parsing.