Spaces:
Running
Running
| import cohere | |
| import asyncio | |
| # Global client variable for lazy initialization | |
| _client = None | |
| def get_client(api_key): | |
| """Get or create Cohere client instance""" | |
| global _client | |
| if _client is None: | |
| _client = cohere.ClientV2(api_key) | |
| return _client | |
| def send_message_stream(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"): | |
| """Stream response from Cohere API""" | |
| # Get or create the Cohere client | |
| co = get_client(api_key) | |
| # Prepare all messages including history | |
| messages = [{"role": "system", "content": system_message}] | |
| messages.extend(conversation_history) | |
| messages.append({"role": "user", "content": user_message}) | |
| # Send streaming request to Cohere | |
| stream = co.chat_stream( | |
| model=model_name, | |
| messages=messages | |
| ) | |
| # Collect full response for history | |
| full_response = "" | |
| # Yield chunks as they come | |
| for chunk in stream: | |
| if chunk.type == "content-delta": | |
| text_chunk = chunk.delta.message.content.text | |
| full_response += text_chunk | |
| yield text_chunk | |
| async def send_message_stream_async(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"): | |
| """Async wrapper for streaming response from Cohere API""" | |
| def _sync_stream(): | |
| return send_message_stream(system_message, user_message, conversation_history, api_key, model_name) | |
| # Run the synchronous generator in a thread | |
| loop = asyncio.get_event_loop() | |
| # Use a queue to handle the streaming data | |
| queue = asyncio.Queue() | |
| def _stream_worker(): | |
| try: | |
| for chunk in _sync_stream(): | |
| loop.call_soon_threadsafe(queue.put_nowait, chunk) | |
| except Exception as e: | |
| loop.call_soon_threadsafe(queue.put_nowait, StopIteration(e)) | |
| else: | |
| loop.call_soon_threadsafe(queue.put_nowait, StopIteration()) | |
| # Start the worker thread | |
| import threading | |
| thread = threading.Thread(target=_stream_worker) | |
| thread.start() | |
| # Yield chunks asynchronously | |
| while True: | |
| chunk = await queue.get() | |
| if isinstance(chunk, StopIteration): | |
| if chunk.args: | |
| raise chunk.args[0] | |
| break | |
| yield chunk | |
| def send_message(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"): | |
| """Non-streaming version for backward compatibility""" | |
| # Get or create the Cohere client | |
| co = get_client(api_key) | |
| # Prepare all messages including history | |
| messages = [{"role": "system", "content": system_message}] | |
| messages.extend(conversation_history) | |
| messages.append({"role": "user", "content": user_message}) | |
| # Send request to Cohere synchronously | |
| response = co.chat( | |
| model=model_name, | |
| messages=messages | |
| ) | |
| # Get the response | |
| response_content = response.message.content[0].text | |
| return response_content | |
| async def send_message_async(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"): | |
| """Async version using asyncio.to_thread""" | |
| return await asyncio.to_thread( | |
| send_message, | |
| system_message, | |
| user_message, | |
| conversation_history, | |
| api_key, | |
| model_name | |
| ) |