I’m trying to build a streaming chat feature in my Next.js app using tRPC. The AI responses take too long without streaming, so I need to send data chunks as they arrive.
Here’s my current backend setup:
import { PassThrough } from 'stream';
import { createAIClient } from 'utils/ai-client';
export const chatRouter = router({
sendMessage: protectedProcedure
.input(z.object({
prompt: z.string()
}))
.mutation(async ({ ctx, input }) => {
const aiStream = new PassThrough({ objectMode: true })
const client = createAIClient()
const aiResponse = await client.createCompletion({
messages: [{
role: 'user',
content: input.prompt
}],
model: 'gpt-3.5-turbo',
temperature: 0.7,
stream: true
}, { responseType: 'stream' })
let currentRole = '';
aiResponse.data.on('data', (chunk: Buffer) => {
const textLines = chunk.toString().split('\n').filter(l => l.trim() !== '');
textLines.forEach(line => {
const content = line.replace(/^data: /, '');
if(content === '[DONE]') return;
const parsedData = JSON.parse(content);
if(parsedData.choices[0].finish_reason === 'stop') return;
currentRole = parsedData.choices[0].delta.role || currentRole;
if(parsedData.choices[0].delta.content) {
aiStream.push({
role: currentRole,
text: parsedData.choices[0].delta.content
});
}
});
});
return aiStream;
})
});
And my React component:
useEffect(() => {
const fetchStream = async () => {
const responseStream = await trpc.chat.sendMessage.mutate({
prompt: 'Hello world'
});
responseStream.on('data', (chunk) => {
console.log('Received:', chunk);
});
};
fetchStream();
}, []);
The problem is that responseStream.on
throws an error saying it’s not a function. The stream events don’t reach my React component properly. How can I make tRPC handle streaming data correctly? Is there a different approach I should use for real-time AI responses?