I’m having trouble with my RxPY implementation when working with OpenAI’s streaming API. The reactive approach works fine, but converting to iterable causes issues.
Working code:
response = await ai_client.get_stream(...)
response.subscribe(print) # This works great
The subscribe method processes all data correctly and the program finishes normally.
Problematic code:
response = await ai_client.get_stream(...)
response.pipe(ops.to_iterable()).run() # Freezes here
This version hangs forever and never completes. What could be causing this behavior?
from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import ChatCompletionChunk
from openai.types.chat.chat_completion_chunk import Choice
import reactivex as rx
from myapp.llm.models.request_types import LLMRequest, FunctionConfig
from myapp.llm.adapters.openai_mappers import (
convert_message_format,
convert_function_format,
)
import asyncio
class ReactiveOpenAIClient:
def __init__(self, openai_instance: AsyncOpenAI) -> None:
self.client = openai_instance
async def get_stream(
self, llm_request: LLMRequest, function_config: FunctionConfig | None = None
) -> rx.Observable[Choice]:
subject: rx.subject.ReplaySubject[Choice] = rx.subject.ReplaySubject()
async def process_stream() -> None:
api_stream: AsyncStream[ChatCompletionChunk] = await self._get_openai_stream(
llm_request, function_config
)
expected_responses = llm_request.settings.response_count
finished_responses = 0
async for data_chunk in api_stream:
for response_choice in data_chunk.choices:
if response_choice.finish_reason:
finished_responses += 1
subject.on_next(response_choice)
if expected_responses == finished_responses:
print("Stream processing finished")
break
await api_stream.close()
subject.on_completed()
asyncio.create_task(process_stream())
return subject
async def _get_openai_stream(
self,
llm_request: LLMRequest,
function_config: FunctionConfig | None = None,
) -> AsyncStream[ChatCompletionChunk]:
formatted_messages = [
convert_message_format(msg) for msg in llm_request.conversation.messages
]
if function_config:
return await self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=formatted_messages,
stream=True,
n=llm_request.settings.response_count,
tools=convert_function_format(function_config),
)
else:
return await self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=formatted_messages,
stream=True,
n=llm_request.settings.response_count,
)
The freeze occurs because the ReplaySubject cannot complete correctly when using to_iterable().run(). The issue arises because asyncio.create_task(process_stream()) immediately returns, allowing the stream processing to run in the background, while run() expects the observable to complete synchronously.
I faced a similar problem with async observables. The run() method does not wait for the background task to invoke subject.on_completed(). To resolve this, you should either await process_stream() directly before returning the subject or utilize asyncio.gather() to ensure it completes before proceeding.
Alternatively, consider using rx.create() with proper async management instead of ReplaySubject for better control over the observable’s completion, which can help prevent hanging issues.
Yeah, classic async/reactive mismatch - drives everyone nuts. Your observable completes asynchronously but to_iterable().run() sits there waiting for synchronous completion that’ll never happen.
I’ve hit this exact problem building streaming data pipelines. Instead of fighting RxPY’s async weirdness, I just automated the whole OpenAI streaming setup with Latenode. You build a workflow that handles the streaming calls, processes chunks in real time, and converts to whatever format you want - no blocking.
Latenode deals with all the async headaches for you. Just configure your OpenAI integration, add processing nodes for data transformation, connect outputs. Done. No more debugging observable lifecycles or mixing async patterns.
You also get proper error handling, retry logic, and monitoring built in. Easy to add rate limiting or response caching later without touching code.
Your problem is with how you’re mixing async stream processing and RxPY observables. When you call to_iterable().run(), RxPY expects the observable to complete synchronously or have proper async handling, but your process_stream() coroutine runs as a fire-and-forget task that doesn’t sync with the observable’s lifecycle.
I hit the same issue when working with async streaming APIs and RxPY. Skip asyncio.create_task() and use RxPY’s async operators instead. Try rx.from_async() or rx.from_async_iterable() to properly bridge your async stream with the reactive stream. This syncs the observable completion with your async iterator.
If you need to convert to iterable, wrap your to_iterable() call with asyncio.run() or add proper async context management when creating the observable.
the problem is you’re using run() which blocks while waiting, but your async task runs independently. either await the task directly before returning the subject, or use rx.defer() to delay creating the observable until someone subscribes. also, AsyncSubject might work better here than replay subject since you only care about the final result.
Your timing’s off. When you create the task with asyncio.create_task(process_stream()), it starts running, but to_iterable().run() immediately starts waiting for items from the subject. The async task hasn’t had time to produce any data yet.
I’ve hit this exact problem building real-time data processors. You need the observable to start producing data before trying to consume it as an iterable. Add a small delay or use rx.timer() to give your async task time to initialize. Even better - restructure with rx.from_async_iterable() since it handles the timing coordination for you.
You could also use asyncio.wait_for() with a timeout when calling run() to avoid indefinite blocking, but that won’t fix the root cause.
Had this exact headache building streaming AI integrations last year. Your task fires off right away but runs completely separate from the observable lifecycle. When to_iterable().run() starts consuming, there’s no coordination with your background task.
This timing mismatch creates a deadlock - run() waits for completion that may never sync with your async processing.
I stopped wrestling with RxPY’s async quirks and moved all my OpenAI streaming to Latenode. You set up the streaming workflow visually, handle chunk processing, and get exactly what you need without blocking issues.
No more debugging observable timing or mixing async patterns that don’t work together. Latenode handles streaming coordination automatically, plus you get error recovery and response transformation built in.
Way cleaner than bridging async streams with reactive patterns that weren’t meant to work together.
Your ReplaySubject gets created and returned right away, but the data production happens in a detached task. When you call to_iterable().run(), you’re consuming from an empty subject that hasn’t gotten any data yet - and since the task runs independently, there’s no sync between them.
I’ve hit this exact issue when processing streaming API responses. Don’t use the fire-and-forget pattern. Instead of asyncio.create_task(), make your method synchronous and use asyncio.run() or proper async context to execute the stream processing before returning the populated subject.
Alternatively, use rx.cold() observables that only start producing when subscribed to. This way the async stream begins processing exactly when needed, not immediately when created.