From eebaa185cd8959e1c83cf77a900f563363fa4d91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Darko=20Mijic=CC=81?= Date: Sat, 17 May 2025 05:29:05 +0200 Subject: [PATCH] Docs: Enhance root and example app README files for clarity and detail --- README.md | 300 ++++++++++++++++------------------ example/README.md | 407 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 524 insertions(+), 183 deletions(-) diff --git a/README.md b/README.md index 181b9ff..6a25789 100644 --- a/README.md +++ b/README.md @@ -4,22 +4,53 @@ -This Convex component enables persistent text streaming. It provides a React hook -for streaming text from HTTP actions while simultaneously storing the data in the -database. This persistence allows the text to be accessed after the stream ends -or by other users. +This Convex component is designed to solve a common, thorny challenge in building modern interactive applications: **how to deliver real-time, token-by-token text streaming to a user while also durably persisting that content for later access, reloads, or observation by others.** -The most common use case is for AI chat applications. The example app (found in the -`example` directory) is a just such a simple chat app that demonstrates use of the -component. +It's particularly well-suited for applications generating text incrementally, such as: +* AI-powered chat and content generation features. +* Live data feeds and activity logs. +* Progress indicators for long-running jobs. -Here's what you'll end up with! The left browser window is streaming the chat body to the client, -and the right browser window is subscribed to the chat body via a database query. The -message is only updated in the database on sentence boundaries, whereas the HTTP -stream sends tokens as they come: +## The Problem: Choosing Between Speed and Durability + +When streaming text, developers often face a trade-off: + +1. **Pure HTTP Streaming:** + * **Pro:** Delivers the lowest latency to the active user, as text chunks arrive directly and immediately. + * **Con:** The data is ephemeral. A page refresh, lost connection, or the need for another user to view the content means the streamed information is gone because it was never stored. + +2. **Pure Database Persistence (for every chunk):** + * **Pro:** All data is durably stored in Convex and accessible. Convex's reactivity can update observers. + * **Con:** Writing every tiny text chunk (e.g., individual LLM tokens) to the database can be inefficient, leading to high write loads, increased network traffic for subscribers (as entire documents might be resent frequently), and potentially a less fluid UX if updates are batched too aggressively to compensate. + +**This component eliminates that trade-off.** + +## The Solution: Intelligent Dual-Path Streaming + +`@convex-dev/persistent-text-streaming` provides a sophisticated "best of both worlds" approach: + +* **For the "Driving" Client (User initiating the stream):** Text is streamed token-by-token directly via an HTTP connection, ensuring a highly responsive and immediate experience. +* **For Persistence & Other Clients:** Simultaneously, the component intelligently buffers the streamed text and writes it to the Convex database in optimized chunks (e.g., sentence by sentence). This ensures data durability and efficient updates for: + * The same user after a page reload. + * Other users observing the stream. + * Long-term archival and retrieval. + +This dual-path mechanism delivers a superior user experience without compromising on data integrity or scalability. + +**See It In Action:** +The active user (left) sees a word-by-word stream, while an observer (right) sees updates from the database, typically at sentence boundaries for efficiency. ![example-animation](./anim.gif) +## Key Features + +* **Low-Latency Streaming:** Provides an immediate, token-by-token experience for the active user. +* **Durable Persistence:** Reliably stores the complete streamed text in your Convex database. +* **Efficient Updates for Observers:** Optimizes database writes and leverages Convex's reactivity for non-driving clients. +* **Seamless Experience:** Gracefully handles page reloads and concurrent viewers. +* **Simplified Development:** Abstracts the complex logic of managing concurrent HTTP streaming and database persistence. +* **Flexible:** Suitable for any text-generation source (LLMs, data processing, live logs, etc.). + ## Pre-requisite: Convex You'll need an existing Convex project to use the component. @@ -30,160 +61,103 @@ Run `npm create convex` or follow any of the [quickstarts](https://docs.convex.d ## Installation -See [`example/`](./example/convex/) for a working demo. - -1. Install the Persistent Text Streaming component: - -```bash -npm install @convex-dev/persistent-text-streaming -``` - -2. Create a [`convex.config.ts`](./example/convex/convex.config.ts) file in your - app's `convex/` folder and install the component by calling `use`: - -```ts -// convex/convex.config.ts -import { defineApp } from "convex/server"; -import persistentTextStreaming from "@convex-dev/persistent-text-streaming/convex.config"; - -const app = defineApp(); -app.use(persistentTextStreaming); -export default app; -``` - -## Usage - -Here's a simple example of how to use the component: - -In `convex/chat.ts`: - -```ts -const persistentTextStreaming = new PersistentTextStreaming( - components.persistentTextStreaming -); - -// Create a stream using the component and store the id in the database with -// our chat message. -export const createChat = mutation({ - args: { - prompt: v.string(), - }, - handler: async (ctx, args) => { - const streamId = await persistentTextStreaming.createStream(ctx); - const chatId = await ctx.db.insert("chats", { - title: "...", - prompt: args.prompt, - stream: streamId, - }); - return chatId; - }, -}); - -// Create a query that returns the chat body. -export const getChatBody = query({ - args: { - streamId: StreamIdValidator, - }, - handler: async (ctx, args) => { - return await persistentTextStreaming.getStreamBody( - ctx, - args.streamId as StreamId - ); - }, -}); - -// Create an HTTP action that generates chunks of the chat body -// and uses the component to stream them to the client and save them to the database. -export const streamChat = httpAction(async (ctx, request) => { - const body = (await request.json()) as {streamId: string}; - const generateChat = async (ctx, request, streamId, chunkAppender) => { - await chunkAppender("Hi there!"); - await chunkAppender("How are you?"); - await chunkAppender("Pretend I'm an AI or something!"); - }; - - const response = await persistentTextStreaming.stream( - ctx, - request, - body.streamId as StreamId, - generateChat - ); - - // Set CORS headers appropriately. - response.headers.set("Access-Control-Allow-Origin", "*"); - response.headers.set("Vary", "Origin"); - return response; -}); -``` - -You need to expose this HTTP endpoint in your backend, so in `convex/http.ts`: - -```ts -http.route({ - path: "/chat-stream", - method: "POST", - handler: streamChat, -}); -``` - -Finally, in your app, you can now create chats and them subscribe to them -via stream and/or database query as optimal: - -```ts -// chat-input.tsx, maybe? -const createChat = useMutation(api.chat.createChat); -const formSubmit = async (e: React.FormEvent) => { - e.preventDefault(); - const chatId = await createChat({ - prompt: inputValue, - }); -}; - -// chat-message.tsx, maybe? -import { useStream } from "@convex-dev/persistent-text-streaming/react"; - -// ... - -// In our component: -const { text, status } = useStream( - api.chat.getChatBody, // The query to call for the full stream body - new URL(`${convexSiteUrl}/chat-stream`), // The HTTP endpoint for streaming - driven, // True if this browser session created this chat and should generate the stream - chat.streamId as StreamId // The streamId from the chat database record -); -``` - -## Design Philosophy - -This component balances HTTP streaming with database persistence to try to -maximize the benefits of both. To understand why this balance is beneficial, -let's examine each approach in isolation. - -- **HTTP streaming only**: If your app _only_ uses HTTP streaming, then the - original browser that made the request will have a great, high-performance - streaming experience. But if that HTTP connection is lost, if the browser - window is reloaded, if other users want to view the same chat, or this - users wants to revisit the conversation later, it won't be possible. The - conversation is only ephemeral because it was never stored on the server. - -- **Database Persistence Only**: If your app _only_ uses database persistence, - it's true that the conversation will be available for as long as you want. - Additionally, Convex's subscriptions will ensure the chat message is updated - as new text chunks are generated. However, there are a few downsides: one, - the entire chat body needs to be resent every time it is changed, which is a - lot redundant bandwidth to push into the database and over the websockets to - all connected clients. Two, you'll need to make a difficult tradeoff between - interactivity and efficiency. If you write every single small chunk to the - database, this will get quite slow and expensive. But if you batch up the chunks - into, say, paragraphs, then the user experience will feel laggy. - -This component combines the best of both worlds. The original browser that -makes the request will still have a great, high-performance streaming experience. -But the chat body is also stored in the database, so it can be accessed by the -client even after the stream has finished, or by other users, etc. +1. **Install the package** into your Convex project: + ```bash + npm install @convex-dev/persistent-text-streaming + ``` + +2. **Register the component** in your Convex backend (`convex/convex.config.ts`): + ```typescript + // convex/convex.config.ts + import { defineApp } from "convex/server"; + import persistentTextStreaming from "@convex-dev/persistent-text-streaming/convex.config"; + + const app = defineApp(); + // This makes the component's backend functions available under `components.persistentTextStreaming` + app.use(persistentTextStreaming); + + export default app; + ``` + +## Usage Overview + +Integrating `@convex-dev/persistent-text-streaming` involves a backend setup to manage and serve the stream and a frontend (React) setup to consume and display it. Here's a high-level look at the core steps: + +**On the Backend (Convex):** + +1. **Initialize Component Client:** Instantiate `PersistentTextStreaming` using `components.persistentTextStreaming` (from `_generated/api`). +2. **Create Stream ID:** In a mutation, when an operation generating text begins (e.g., user sends a message, a task starts), call `streamingComponent.createStream(ctx)`. This returns a unique `StreamId`. +3. **Store Stream ID:** Save this `StreamId` in your relevant application database document (e.g., alongside a user's message or a task record). +4. **Implement HTTP Streaming Action:** + * Create a Convex HTTP action. The "driving" client will `POST` to this action, sending the `StreamId`. + * Inside this action, use `streamingComponent.stream(ctx, request, streamId, writerCallback)`. + * Your `writerCallback` (an async function you provide) generates/fetches text and calls the provided `append(textChunk)` function. `append` immediately sends the chunk over the HTTP response *and* queues it for optimized database persistence. +5. **Query Persisted Data:** Create a Convex query using `streamingComponent.getStreamBody(ctx, streamId)` to retrieve the complete, persisted text and status for any given `StreamId`. This is used by non-driving clients or as a fallback. + +**On the Frontend (React):** + +1. **Use the `useStream` Hook:** Import `useStream` from `@convex-dev/persistent-text-streaming/react`. +2. **Provide Hook Parameters:** + * Your Convex query for fetching persisted data (step 5 above). + * The full URL to your HTTP streaming action (step 4 above). + * An `isDriven` boolean flag: `true` if this client session initiated the stream, `false` otherwise. + * The `StreamId` of the content to display. +3. **Render Streamed Text:** The hook returns `{ text, status }`, automatically managing data fetching (via HTTP stream if `isDriven`, or via database query otherwise) and providing reactive updates. + +**For a comprehensive, step-by-step guide with detailed code examples from a working AI chat application, please see our [Example App Implementation Guide](./example/README.md).** This guide walks through schema design, all necessary backend functions, and frontend React component integration. + +## How It Works: The Dual-Path Mechanism + +The power of this component lies in its intelligent handling of text streaming and persistence, primarily orchestrated by the `useStream` hook on the frontend based on the `isDriven` flag: + +* **When `isDriven` is `true` (e.g., the client that submitted an AI prompt):** + 1. The `useStream` hook makes an HTTP `POST` request to your configured `streamUrl` (your HTTP streaming action), passing the `streamId`. + 2. Your backend HTTP action, using `streamingComponent.stream()`, starts generating text and calls `append(chunk)`. + 3. The `append` function *immediately* sends `chunk` over the HTTP response to this driving client. + 4. Simultaneously, `append` buffers chunks and schedules optimized writes to the Convex database (e.g., at sentence boundaries). + 5. The driving client experiences very low-latency, token-by-token updates directly from the HTTP stream. + +* **When `isDriven` is `false` (e.g., another user viewing the same chat, or the original user after a page reload):** + 1. The `useStream` hook *does not* make an HTTP request to `streamUrl`. + 2. Instead, it primarily relies on the Convex query you provided (e.g., `api.yourModule.getStreamBody`) to fetch the text. + 3. As the backend (driven by the *other* client or its initial action) persists chunks to the database, Convex's reactivity system automatically updates the query results for these non-driving clients. + 4. These observer clients see updates as they are committed to the database, typically in slightly larger, more efficient batches. + +This mechanism ensures the initiating user gets the fastest possible experience, while all other viewers receive consistent, durable data efficiently. + +## Viewing Component Data in the Convex Dashboard + +This component manages its own data tables within your Convex project. You can inspect this data: + +1. Click on your Convex project in the [Convex Dashboard](https://dashboard.convex.dev) and go to the "Data" section. +2. Use the table component selector and to switch from the `app` to `persistentTextStreaming`. + +You'll find two tables: +* **`streams`**: Records for each stream, tracking its overall status (e.g., `pending`, `streaming`, `done`, `error`, `timeout`). +* **`chunks`**: The actual text content, broken into pieces and linked to a `streamId`. + +## API Highlights + +### Backend (`PersistentTextStreaming` class) + +* `new PersistentTextStreaming(components.persistentTextStreaming)`: Initializes the component client. +* `async createStream(ctx: MutationCtx): Promise`: Creates a unique stream ID. +* `async stream(ctx: ActionCtx, request: Request, streamId: StreamId, writerCallback)`: The core method for HTTP streaming and database persistence. The `writerCallback` (an async function you provide) receives an `append(text: string)` function to send text chunks. +* `async getStreamBody(ctx: QueryCtx, streamId: StreamId): Promise<{ text: string; status: StreamStatus }>`: Retrieves the full persisted text and current status of a stream. + +(Refer to the source code and the [Example App Implementation Guide](./example/README.md) for full-type signatures and advanced usage.) + +### Frontend (`useStream` React Hook) + +* `useStream(getPersistentBodyQuery, streamUrl, isDriven, streamId)`: + * `getPersistentBodyQuery`: Convex query to fetch persisted stream data. + * `streamUrl`: Full URL to your HTTP streaming action. + * `isDriven`: Boolean, `true` if this client initiated/drives the stream. + * `streamId`: The ID of the stream to display. + * Returns: `{ text: string; status: StreamStatus }`. Manages data fetching based on `isDriven` and provides reactive updates. ## Background -This component is largely based on the Stack post [AI Chat with HTTP Streaming](https://stack.convex.dev/ai-chat-with-http-streaming). +This component's approach and design are largely based on the concepts discussed in the Convex Stack post: [AI Chat with HTTP Streaming](https://stack.convex.dev/ai-chat-with-http-streaming). diff --git a/example/README.md b/example/README.md index 14a90ed..be470fc 100644 --- a/example/README.md +++ b/example/README.md @@ -1,34 +1,401 @@ -# Chat Example App +# Example App: AI Chat with Persistent Text Streaming -This is a simple chat app that uses the persistent text streaming component. -When a new prompt is submitted, the app will stream the response from the OpenAI API -back to the client and write the prompt and response to the database in an efficient -way. When the app is refreshed, it will use the database to value to restore the -chat. Other concurrent browser sessions will also see the chat updates by subscribing -to the Convex database records in the usual way. +This is a simple AI-powered chat application built with React, Vite, and Convex. It serves as a practical demonstration of the **`@convex-dev/persistent-text-streaming`** component, showcasing how to achieve real-time, low-latency text streaming to the active user while simultaneously ensuring the conversation is durably persisted in the Convex database for reloads and other observers. -## Running the app +## Running the Example ### Prerequisites -- Node.js (v18+) -- npm (v10+) +* Node.js (v18 or newer recommended) +* npm (v10 or newer recommended) +* A Convex account and project set up. If you don't have one, run `npm create convex`. -### Provisioning the backend and starting the frontend +### 1. Install Dependencies + +Navigate to this `example` directory and the root directory to install dependencies: ```bash +# From the root of the repository +npm install + +# Navigate into the example directory +cd example npm install -npm run dev # in one terminal -npm run dev:frontend # in another terminal ``` -### Establishing your OPENAI_API_KEY +### 2. Configure OpenAI API Key + +This chat app uses OpenAI's API to generate responses. You'll need to set your `OPENAI_API_KEY` as an environment variable in your Convex deployment: + +1. Obtain an API key from [OpenAI](https://platform.openai.com/api-keys). +2. In your terminal (while in the `example` directory or the project root), run: + ```bash + npx convex env set OPENAI_API_KEY + ``` + Replace `` with your actual key. + +### 3. Run the Development Servers + +You'll need two terminal sessions: one for the Convex backend and one for the Vite frontend. + +**Terminal 1: Convex Backend** +(From the `example` directory) +```bash +npm run dev:backend +# This runs: convex dev --live-component-sources --typecheck-components +``` +This command starts the Convex local development backend. The flags ensure it picks up the component source code correctly for live development. + +**Terminal 2: Vite Frontend** +(From the `example` directory) +```bash +npm run dev:frontend +# This runs: vite +``` +This starts the React frontend development server. Your browser should automatically open to the chat application. If not, navigate to the URL shown in the terminal (usually `http://localhost:5173`). + +You should now be able to interact with the chat application! + +## How `@convex-dev/persistent-text-streaming` is Used + +This example app leverages the component in several key areas, both on the backend (Convex functions) and frontend (React components). + +### A. Backend Setup (Convex Functions) + +#### 1. Component Registration (`convex/convex.config.ts`) + +First, the component is registered with the Convex app. This makes its backend functions available to our application. + +```typescript +// example/convex/convex.config.ts +import { defineApp } from "convex/server"; +import persistentTextStreaming from "@convex-dev/persistent-text-streaming/convex.config"; + +const app = defineApp(); +// The `use` method registers the component's schema and functions +// under the 'persistentTextStreaming' namespace. +app.use(persistentTextStreaming); + +export default app; +``` +After this, the component's API can be accessed via `components.persistentTextStreaming` in other Convex functions. + +#### 2. Database Schema (`convex/schema.ts`) + +The application defines a `userMessages` table to store user prompts and link them to the AI's streamed response. + +```typescript +// example/convex/schema.ts +import { defineSchema, defineTable } from "convex/server"; +// StreamIdValidator ensures the ID format is correct for the component. +import { StreamIdValidator } from "@convex-dev/persistent-text-streaming"; +import { v } from "convex/values"; + +export default defineSchema({ + userMessages: defineTable({ + prompt: v.string(), + // This field stores the unique ID for the AI's response stream, + // managed by the persistent-text-streaming component. + responseStreamId: StreamIdValidator, + }).index("by_stream", ["responseStreamId"]), // Optional index +}); +``` +The component itself internally manages its own `streams` and `chunks` tables; you don't define those in your application's schema. + +#### 3. Initializing the Component Client (`convex/streaming.ts`) + +A client instance for the `PersistentTextStreaming` component is created to interact with its API. + +```typescript +// example/convex/streaming.ts +import { + PersistentTextStreaming, + StreamId, + StreamIdValidator, +} from "@convex-dev/persistent-text-streaming"; +// `components` is auto-generated, providing access to mounted component APIs. +import { components } from "./_generated/api"; +import { query } from "./_generated/server"; + +// Initialize the component client, pointing to its registered API namespace. +export const streamingComponent = new PersistentTextStreaming( + components.persistentTextStreaming +); + +// Query to retrieve the fully assembled text and status of a persisted stream. +// This is used by non-driving clients or as a fallback by the `useStream` hook. +export const getStreamBody = query({ + args: { + streamId: StreamIdValidator, + }, + handler: async (ctx, args) => { + return await streamingComponent.getStreamBody( + ctx, + args.streamId as StreamId // Cast to the strong StreamId type + ); + }, +}); +``` + +#### 4. Creating a Stream on New User Message (`convex/messages.ts -> sendMessage`) + +When a user sends a new prompt, the `sendMessage` mutation is called. It prepares for the AI's streamed response by creating a new stream ID. + +```typescript +// example/convex/messages.ts +import { query, mutation, internalQuery } from "./_generated/server"; +import { StreamId } from "@convex-dev/persistent-text-streaming"; +import { v } from "convex/values"; +import { streamingComponent } from "./streaming"; // Our initialized component client +// ... other imports + +export const sendMessage = mutation({ + args: { + prompt: v.string(), + }, + handler: async (ctx, args) => { + // 1. Create a new stream ID using the component. This prepares the backend + // to receive and persist chunks for this specific response. + const responseStreamId = await streamingComponent.createStream(ctx); + + // 2. Insert the user's message into our `userMessages` table, + // linking it to the newly created `responseStreamId`. + const chatId = await ctx.db.insert("userMessages", { + prompt: args.prompt, + responseStreamId, // This ID will be used by the frontend to fetch the stream. + }); + + // Note: The actual AI generation and streaming to this `responseStreamId` + // is typically triggered by an HTTP action, called by the frontend. + return chatId; // Returns the ID of the `userMessages` document. + }, +}); +``` + +#### 5. Streaming AI Response via HTTP Action (`convex/chat.ts -> streamChat`) + +The core of the streaming logic resides in an HTTP action. The frontend (specifically the `useStream` hook for the "driving" client) calls this endpoint to initiate the AI response generation and receive it as an HTTP stream. + +```typescript +// example/convex/chat.ts +import { httpAction } from "./_generated/server"; +import { internal } from "./_generated/api"; +import { StreamId } from "@convex-dev/persistent-text-streaming"; +import { OpenAI } from "openai"; +import { streamingComponent } from "./streaming"; + +const openai = new OpenAI(); // Requires OPENAI_API_KEY in Convex env vars + +export const streamChat = httpAction(async (ctx, request) => { + // The frontend sends the `streamId` (obtained from the `userMessages` document) + // to identify which stream this AI response belongs to. + const body = (await request.json()) as { + streamId: string; + }; + + // `streamingComponent.stream()` is the central method. It: + // - Takes care of setting up the HTTP streaming response to the client. + // - Provides an `append` function to its callback. + // - Persists appended text to the database. + const response = await streamingComponent.stream( + ctx, // ActionCtx, can be used to run queries/mutations if needed + request, // The original HTTP request object + body.streamId as StreamId, // The specific stream to write to + // This async callback is where the AI response is generated and "appended": + async (_actionCtx, _httpRequest, _sId, append) => { + const history = await ctx.runQuery(internal.messages.getHistory); // For context + + const stream = await openai.chat.completions.create({ + model: "gpt-4.1-mini", // Or your preferred model + messages: [ + { role: "system", content: "You are a helpful assistant..." }, + ...history, + ], + stream: true, + }); + + // As OpenAI (or any source) streams chunks of text: + for await (const part of stream) { + const content = part.choices[0]?.delta?.content || ""; + // Calling `append(content)` does two things: + // 1. Sends `content` immediately over the HTTP stream to the "driving" client. + // 2. Buffers `content` and schedules it for persistence in the database + // (typically flushed at sentence boundaries or when the stream ends). + await append(content); + } + } + ); + + // Set CORS headers for the HTTP response. + response.headers.set("Access-Control-Allow-Origin", "*"); + response.headers.set("Vary", "Origin"); + + return response; +}); +``` + +#### 6. Routing the HTTP Action (`convex/http.ts`) + +The `streamChat` HTTP action needs to be mapped to a public URL. + +```typescript +// example/convex/http.ts +import { httpRouter } from "convex/server"; +import { streamChat } from "./chat"; // Our HTTP action +import { httpAction } from "./_generated/server"; // For basic HTTP actions + +const http = httpRouter(); + +// Route for initiating the chat stream. The frontend will POST to this path. +http.route({ + path: "/chat-stream", + method: "POST", + handler: streamChat, +}); + +// Standard CORS OPTIONS handler for the /chat-stream path. +http.route({ + path: "/chat-stream", + method: "OPTIONS", + handler: httpAction(async (_, request) => { + // ... (Full CORS header logic as in the example file) ... + return new Response(null, { /* ... CORS headers ... */ }); + }), +}); + +export default http; +``` + +#### 7. Retrieving Persisted Stream Content (`convex/streaming.ts -> getStreamBody`) + +The `getStreamBody` query (defined earlier in `convex/streaming.ts`) allows any client (especially non-driving ones or after a reload) to fetch the complete, persisted text of a stream. The `useStream` hook on the frontend uses this as its source of truth when not directly driving the HTTP stream. + +```typescript +// example/convex/streaming.ts (relevant part) +export const getStreamBody = query({ + args: { streamId: StreamIdValidator }, + handler: async (ctx, args) => { + // `streamingComponent.getStreamBody` reconstructs the full text + // from the internally managed 'chunks' table for the given streamId. + return await streamingComponent.getStreamBody(ctx, args.streamId as StreamId); + }, +}); +``` +The `convex/messages.ts -> getHistory` internal query also demonstrates using `streamingComponent.getStreamBody` to fetch AI responses for building conversation context. + +### B. Frontend Implementation (React Components) + +#### 1. Triggering Message Sending & Managing "Driven" State (`src/components/ChatWindow.tsx`) + +When the user submits a prompt, `ChatWindow.tsx` calls the `sendMessage` mutation and keeps track of which messages this client session "drives" (i.e., initiated the stream for). + +```typescript +// example/src/components/ChatWindow.tsx (simplified logic) +import { useMutation, useQuery } from "convex/react"; +import { api } from "../../convex/_generated/api"; +import React, { useState } from "react"; +// ... ServerMessage import + +export default function ChatWindow() { + const [inputValue, setInputValue] = useState(""); + // `drivenIds` stores the `_id` of `userMessages` documents for which + // this client session initiated the AI response stream. + const [drivenIds, setDrivenIds] = useState>(new Set()); + const messages = useQuery(api.messages.listMessages); + const sendMessageMutation = useMutation(api.messages.sendMessage); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + // ... (input validation) ... + const userMessageDocId = await sendMessageMutation({ prompt: inputValue }); + setInputValue(""); + + // Add the ID of the new userMessage document to `drivenIds`. + // This signals that the `ServerMessage` component for this AI response + // should be "driven" by this client. + setDrivenIds((prev) => new Set(prev).add(userMessageDocId)); + // ... (setIsStreaming(true) for UI feedback) + }; + + // In the render method, when mapping through `messages`: + // + // isDriven={drivenIds.has(message._id)} // Pass the "driven" status + // // ... other props + // /> + // ... +} +``` + +#### 2. Displaying the Streamed Response with `useStream` (`src/components/ServerMessage.tsx`) + +This component is responsible for rendering the AI's response. It uses the `useStream` hook from `@convex-dev/persistent-text-streaming/react`. + +```typescript +// example/src/components/ServerMessage.tsx +import { getConvexSiteUrl } from "@/lib/utils"; // Helper for HTTP endpoint URL +import { StreamId } from "@convex-dev/persistent-text-streaming"; +import { useStream } from "@convex-dev/persistent-text-streaming/react"; // The hook! +import { api } from "../../convex/_generated/api"; +import { Doc } from "../../convex/_generated/dataModel"; +import { useMemo, useEffect } from "react"; +import Markdown from "react-markdown"; + +export function ServerMessage({ + message, // The Doc<"userMessages"> from the database + isDriven, // Boolean: true if this client session initiated this AI stream + stopStreaming, // Callback for UI state + scrollToBottom, // Callback for UI state +}: { + message: Doc<"userMessages">; + isDriven: boolean; + stopStreaming: () => void; + scrollToBottom: () => void; +}) { + // Construct the full URL to the HTTP streaming endpoint (`/chat-stream`) + const streamEndpointUrl = useMemo(() => { + try { + return new URL(`${getConvexSiteUrl()}/chat-stream`); + } catch (e) { /* ... error handling ... */ return null; } + }, []); + + // The `useStream` hook manages fetching and displaying the stream. + const { text, status } = useStream( + api.streaming.getStreamBody, // 1. Convex query for persisted data (used if !isDriven or as fallback) + streamEndpointUrl!, // 2. URL of your HTTP streaming action (e.g., convex/chat.ts -> streamChat) + isDriven, // 3. Critical flag: if true, hook POSTs to streamUrl to drive the HTTP stream. + // If false, relies on the query and Convex subscriptions. + message.responseStreamId as StreamId // 4. The ID of the stream to display. + ); + + // ... (useEffect hooks for UI updates like scrolling and stopping streaming indicator) ... + + return ( +
+ {text || (status === "pending" && isDriven ? "AI is thinking..." : "")} + {/* ... (display error/timeout based on `status`) ... */} +
+ ); +} +``` +The `getConvexSiteUrl()` helper in `src/lib/utils.ts` is used to correctly determine the base URL for the HTTP endpoint, accounting for local development (port + 1) versus cloud deployments (`.site` TLD). + +### C. Viewing Component Data in Convex Dashboard + +You can inspect the data managed by the `@convex-dev/persistent-text-streaming` component directly in your Convex Dashboard: + +1. Click on your Convex project in the [Convex Dashboard](https://dashboard.convex.dev) and go to the "Data" section. +2. Use the table component selector and to switch from the `app` to `persistentTextStreaming`. + +You'll find two tables: +* **`streams`**: Records for each stream, tracking its overall status (e.g., `pending`, `streaming`, `done`, `error`, `timeout`). +* **`chunks`**: The actual text content, broken into pieces and linked to a `streamId`. -This chat app talks to OpenAI's API. You need to set the `OPENAI_API_KEY` environment variable -inside your Convex backend. +The `streamingComponent.getStreamBody()` function (and thus `api.streaming.getStreamBody`) reads from these tables to reconstruct the full message. -1. Download an API key from [OpenAI](https://platform.openai.com/api-keys). -2. Run `npx convex env set OPENAI_API_KEY=` +## Key Takeaways from this Example -Then you should be able to chat successfully with the app. Pop up the Convex dashboard to -debug any issues (the `npx convex dashboard` command will get you to the right place). +* **Dual Path for Optimal UX:** The "driving" client gets ultra-low latency via HTTP streaming, while persisted data allows reloads and observation by other clients efficiently. +* **Simplified Complexity:** The component handles the intricacies of managing both the HTTP stream and database persistence. +* **Clear Separation of Concerns:** Your application logic focuses on when to create streams and how to generate content, while the component handles the streaming mechanics. +* **Leverages Convex Strengths:** Uses Convex mutations for transactional stream creation, HTTP actions for direct streaming, queries for data retrieval, and Convex's reactivity for updating non-driving clients.