muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Alembic migrations environment for CX AI Agent
"""
import asyncio
import os
import sys
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
# Import models
from mcp.database.models import Base
# Alembic Config object
config = context.config
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Add metadata
target_metadata = Base.metadata
# Get database URL from environment or use default
database_url = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./data/cx_agent.db")
# Convert postgres:// to postgresql:// for SQLAlchemy
if database_url.startswith("postgres://"):
database_url = database_url.replace("postgres://", "postgresql+asyncpg://", 1)
# Override sqlalchemy.url in alembic config
config.set_main_option("sqlalchemy.url", database_url)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with connection"""
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode with async engine"""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = database_url
connectable = async_engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode"""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()