diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c4dc767 --- /dev/null +++ b/.env.example @@ -0,0 +1,24 @@ +# 应用配置 +APP_NAME=collector-backend +APP_ENV=development +DEBUG=true +SECRET_KEY=your-secret-key-here + +# 数据库配置 +DATABASE_URL=postgresql+asyncpg://postgres:password@localhost:5432/collector +DATABASE_ECHO=false + +# Redis配置 +REDIS_URL=redis://localhost:6379/0 + +# API配置 +API_PREFIX=/api/v1 +CORS_ORIGINS=["http://localhost:3000","http://localhost:5173"] + +# 日志配置 +LOG_LEVEL=INFO + +# Gitea集成 +GITEA_URL=https://gitea.devops.1msoft.cn +GITEA_TOKEN=your-gitea-token + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a45f26c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM python:3.11-slim + +WORKDIR /app + +# 安装系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# 安装 Poetry +RUN curl -sSL https://install.python-poetry.org | python3 - +ENV PATH="/root/.local/bin:$PATH" + +# 复制项目文件 +COPY pyproject.toml poetry.lock* ./ + +# 安装依赖 +RUN poetry config virtualenvs.create false \ + && poetry install --no-interaction --no-ansi --no-root --only main + +# 复制源代码 +COPY src/ ./src/ +COPY alembic.ini ./ +COPY migrations/ ./migrations/ + +# 暴露端口 +EXPOSE 8000 + +# 启动命令 +CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"] + diff --git a/README.md b/README.md index 2f1b483..39b5676 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,154 @@ -# collector-backend +# Collector Backend -数据采集后端服务 - Python FastAPI 实现 \ No newline at end of file +数据采集后端服务 - Python FastAPI 实现 + +## 项目介绍 + +Collector Backend 是 IDE 数据采集系统的后端服务,负责接收、存储和分析来自各 IDE 插件的数据。 + +## 技术栈 + +- **框架**: FastAPI +- **数据库**: PostgreSQL + SQLAlchemy (async) +- **缓存**: Redis +- **数据库迁移**: Alembic +- **包管理**: Poetry + +## 快速开始 + +### 环境要求 + +- Python 3.11+ +- PostgreSQL 14+ +- Redis 7+ +- Poetry + +### 安装 + +```bash +# 安装依赖 +poetry install + +# 复制环境配置 +cp .env.example .env + +# 编辑 .env 配置数据库等信息 +``` + +### 数据库迁移 + +```bash +# 生成迁移 +poetry run alembic revision --autogenerate -m "Initial migration" + +# 执行迁移 +poetry run alembic upgrade head +``` + +### 运行 + +```bash +# 开发模式 +poetry run uvicorn src.main:app --reload --port 8000 + +# 生产模式 +poetry run uvicorn src.main:app --host 0.0.0.0 --port 8000 +``` + +### Docker 运行 + +```bash +# 启动所有服务 +docker-compose up -d + +# 查看日志 +docker-compose logs -f app +``` + +## API 文档 + +启动服务后访问: +- Swagger UI: http://localhost:8000/docs +- ReDoc: http://localhost:8000/redoc + +## API 端点 + +### 健康检查 +- `GET /api/v1/health` - 健康检查 +- `GET /api/v1/health/db` - 数据库健康检查 + +### 事件管理 +- `POST /api/v1/events` - 创建单个事件 +- `POST /api/v1/events/batch` - 批量创建事件 +- `GET /api/v1/events` - 查询事件列表 +- `GET /api/v1/events/{event_id}` - 获取单个事件 + +### 数据分析 +- `GET /api/v1/analytics/overview` - 概览统计 +- `GET /api/v1/analytics/acceptance-rate` - 采纳率统计 +- `GET /api/v1/analytics/token-usage` - Token使用统计 +- `GET /api/v1/analytics/user-activity` - 用户活动统计 + +### 配置管理 +- `GET /api/v1/config` - 获取客户端配置 + +## 项目结构 + +``` +collector-backend/ +├── src/ +│ ├── api/ +│ │ └── v1/ +│ │ ├── events.py # 事件API +│ │ ├── analytics.py # 分析API +│ │ ├── config.py # 配置API +│ │ └── health.py # 健康检查 +│ ├── config/ +│ │ ├── settings.py # 配置管理 +│ │ └── database.py # 数据库配置 +│ ├── models/ +│ │ ├── event.py # 事件模型 +│ │ └── user.py # 用户模型 +│ ├── schemas/ +│ │ ├── event.py # 事件Schema +│ │ └── analytics.py # 分析Schema +│ ├── services/ +│ │ ├── event_processor.py # 事件处理 +│ │ └── analytics_engine.py # 分析引擎 +│ └── main.py # 应用入口 +├── migrations/ # 数据库迁移 +├── tests/ # 测试 +├── pyproject.toml # 项目配置 +├── docker-compose.yml # Docker配置 +└── README.md +``` + +## 开发 + +### 代码格式化 + +```bash +# 格式化代码 +poetry run black src tests +poetry run isort src tests + +# 代码检查 +poetry run ruff check src tests +poetry run mypy src +``` + +### 运行测试 + +```bash +poetry run pytest +poetry run pytest --cov=src +``` + +## 相关仓库 + +- [ide-data-collector](../ide-data-collector) - IDE插件 Monorepo +- [collector-dashboard](../collector-dashboard) - 数据分析看板 + +## 许可证 + +MIT License diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..369eebd --- /dev/null +++ b/alembic.ini @@ -0,0 +1,43 @@ +[alembic] +script_location = migrations +prepend_sys_path = . +version_path_separator = os + +sqlalchemy.url = driver://user:pass@localhost/dbname + +[post_write_hooks] + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e4f7f94 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,39 @@ +version: '3.8' + +services: + app: + build: . + ports: + - "8000:8000" + environment: + - DATABASE_URL=postgresql+asyncpg://postgres:password@db:5432/collector + - REDIS_URL=redis://redis:6379/0 + depends_on: + - db + - redis + volumes: + - ./src:/app/src + command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload + + db: + image: postgres:16-alpine + ports: + - "5432:5432" + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=password + - POSTGRES_DB=collector + volumes: + - postgres_data:/var/lib/postgresql/data + + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + +volumes: + postgres_data: + redis_data: + diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..8b94f2a --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,77 @@ +""" +Alembic 迁移环境配置 +""" + +import asyncio +from logging.config import fileConfig + +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from alembic import context + +from src.config.settings import settings +from src.config.database import Base +from src.models import Event, User # 导入所有模型 + +# Alembic Config 对象 +config = context.config + +# 设置数据库URL +config.set_main_option("sqlalchemy.url", settings.DATABASE_URL) + +# 日志配置 +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# 目标元数据 +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + """离线模式运行迁移""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + """执行迁移""" + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + """异步模式运行迁移""" + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + """在线模式运行迁移""" + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() + diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..3c2e787 --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,27 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} + diff --git a/migrations/versions/.gitkeep b/migrations/versions/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..be865ba --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,60 @@ +[tool.poetry] +name = "collector-backend" +version = "0.1.0" +description = "数据采集后端服务 - Python FastAPI 实现" +authors = ["tangweijie <877588133@qq.com>"] +readme = "README.md" +packages = [{include = "src"}] + +[tool.poetry.dependencies] +python = "^3.11" +fastapi = "^0.109.0" +uvicorn = {extras = ["standard"], version = "^0.25.0"} +pydantic = "^2.5.0" +pydantic-settings = "^2.1.0" +sqlalchemy = {extras = ["asyncio"], version = "^2.0.25"} +asyncpg = "^0.29.0" +alembic = "^1.13.0" +redis = "^5.0.0" +httpx = "^0.26.0" +python-jose = {extras = ["cryptography"], version = "^3.3.0"} +passlib = {extras = ["bcrypt"], version = "^1.7.4"} +python-multipart = "^0.0.6" + +[tool.poetry.group.dev.dependencies] +pytest = "^7.4.0" +pytest-asyncio = "^0.23.0" +pytest-cov = "^4.1.0" +black = "^23.12.0" +isort = "^5.13.0" +mypy = "^1.8.0" +ruff = "^0.1.9" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.black] +line-length = 100 +target-version = ['py311'] +include = '\.pyi?$' + +[tool.isort] +profile = "black" +line_length = 100 + +[tool.mypy] +python_version = "3.11" +warn_return_any = true +warn_unused_configs = true +ignore_missing_imports = true + +[tool.ruff] +line-length = 100 +target-version = "py311" +select = ["E", "F", "W", "I", "N", "UP", "B", "C4"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..804c5b4 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,6 @@ +""" +Collector Backend - 数据采集后端服务 +""" + +__version__ = "0.1.0" + diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..f4a1a37 --- /dev/null +++ b/src/api/__init__.py @@ -0,0 +1,2 @@ +"""API 模块""" + diff --git a/src/api/v1/__init__.py b/src/api/v1/__init__.py new file mode 100644 index 0000000..40b6a20 --- /dev/null +++ b/src/api/v1/__init__.py @@ -0,0 +1,2 @@ +"""API v1 模块""" + diff --git a/src/api/v1/analytics.py b/src/api/v1/analytics.py new file mode 100644 index 0000000..45d18e8 --- /dev/null +++ b/src/api/v1/analytics.py @@ -0,0 +1,102 @@ +""" +数据分析接口 +""" + +from datetime import datetime, timedelta +from typing import Optional + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config.database import get_db +from src.services.analytics_engine import AnalyticsEngine +from src.schemas.analytics import ( + OverviewStats, + AcceptanceRateStats, + TokenUsageStats, + UserActivityStats, +) + +router = APIRouter() + + +@router.get("/analytics/overview", response_model=OverviewStats) +async def get_overview_stats( + start_date: Optional[datetime] = Query(None), + end_date: Optional[datetime] = Query(None), + db: AsyncSession = Depends(get_db), +): + """获取概览统计""" + if not start_date: + start_date = datetime.utcnow() - timedelta(days=7) + if not end_date: + end_date = datetime.utcnow() + + engine = AnalyticsEngine(db) + return await engine.get_overview_stats(start_date, end_date) + + +@router.get("/analytics/acceptance-rate", response_model=AcceptanceRateStats) +async def get_acceptance_rate( + start_date: Optional[datetime] = Query(None), + end_date: Optional[datetime] = Query(None), + group_by: str = Query("day", enum=["hour", "day", "week"]), + db: AsyncSession = Depends(get_db), +): + """获取采纳率统计""" + if not start_date: + start_date = datetime.utcnow() - timedelta(days=30) + if not end_date: + end_date = datetime.utcnow() + + engine = AnalyticsEngine(db) + return await engine.get_acceptance_rate(start_date, end_date, group_by) + + +@router.get("/analytics/token-usage", response_model=TokenUsageStats) +async def get_token_usage( + start_date: Optional[datetime] = Query(None), + end_date: Optional[datetime] = Query(None), + db: AsyncSession = Depends(get_db), +): + """获取Token使用统计""" + if not start_date: + start_date = datetime.utcnow() - timedelta(days=30) + if not end_date: + end_date = datetime.utcnow() + + engine = AnalyticsEngine(db) + return await engine.get_token_usage(start_date, end_date) + + +@router.get("/analytics/user-activity", response_model=UserActivityStats) +async def get_user_activity( + start_date: Optional[datetime] = Query(None), + end_date: Optional[datetime] = Query(None), + db: AsyncSession = Depends(get_db), +): + """获取用户活动统计""" + if not start_date: + start_date = datetime.utcnow() - timedelta(days=7) + if not end_date: + end_date = datetime.utcnow() + + engine = AnalyticsEngine(db) + return await engine.get_user_activity(start_date, end_date) + + +@router.get("/analytics/providers") +async def get_provider_stats( + start_date: Optional[datetime] = Query(None), + end_date: Optional[datetime] = Query(None), + db: AsyncSession = Depends(get_db), +): + """获取AI提供商统计""" + if not start_date: + start_date = datetime.utcnow() - timedelta(days=30) + if not end_date: + end_date = datetime.utcnow() + + engine = AnalyticsEngine(db) + return await engine.get_provider_stats(start_date, end_date) + diff --git a/src/api/v1/config.py b/src/api/v1/config.py new file mode 100644 index 0000000..c97feec --- /dev/null +++ b/src/api/v1/config.py @@ -0,0 +1,45 @@ +""" +配置管理接口 +""" + +from fastapi import APIRouter + +router = APIRouter() + + +@router.get("/config") +async def get_config(): + """获取客户端配置""" + return { + "collection": { + "enabled": True, + "sampling_rate": 1.0, + "batch_size": 50, + "flush_interval_seconds": 60, + }, + "privacy": { + "anonymize_user": True, + "obfuscate_code": True, + "max_code_length": 500, + }, + "events_to_capture": [ + "code_completion_shown", + "code_completion_accepted", + "code_completion_rejected", + "chat_session_start", + "chat_message_sent", + "chat_response_received", + "chat_session_end", + ], + } + + +@router.get("/config/version") +async def get_version(): + """获取API版本""" + return { + "api_version": "v1", + "min_client_version": "0.1.0", + "latest_client_version": "0.1.0", + } + diff --git a/src/api/v1/events.py b/src/api/v1/events.py new file mode 100644 index 0000000..1e00081 --- /dev/null +++ b/src/api/v1/events.py @@ -0,0 +1,77 @@ +""" +事件上报接口 +""" + +from typing import List +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config.database import get_db +from src.schemas.event import EventCreate, EventResponse, EventBatchCreate, EventBatchResponse +from src.services.event_processor import EventProcessor + +router = APIRouter() + + +@router.post("/events", response_model=EventResponse, status_code=status.HTTP_201_CREATED) +async def create_event( + event: EventCreate, + db: AsyncSession = Depends(get_db), +): + """创建单个事件""" + processor = EventProcessor(db) + result = await processor.process_event(event) + return result + + +@router.post("/events/batch", response_model=EventBatchResponse) +async def create_events_batch( + batch: EventBatchCreate, + db: AsyncSession = Depends(get_db), +): + """批量创建事件""" + processor = EventProcessor(db) + results = await processor.process_batch(batch.events) + return EventBatchResponse( + success=True, + processed=len(results), + timestamp=datetime.utcnow().isoformat(), + ) + + +@router.get("/events", response_model=List[EventResponse]) +async def list_events( + skip: int = 0, + limit: int = 100, + event_type: str = None, + user_id: str = None, + db: AsyncSession = Depends(get_db), +): + """查询事件列表""" + processor = EventProcessor(db) + events = await processor.list_events( + skip=skip, + limit=limit, + event_type=event_type, + user_id=user_id, + ) + return events + + +@router.get("/events/{event_id}", response_model=EventResponse) +async def get_event( + event_id: str, + db: AsyncSession = Depends(get_db), +): + """获取单个事件""" + processor = EventProcessor(db) + event = await processor.get_event(event_id) + if not event: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Event {event_id} not found", + ) + return event + diff --git a/src/api/v1/health.py b/src/api/v1/health.py new file mode 100644 index 0000000..102adc3 --- /dev/null +++ b/src/api/v1/health.py @@ -0,0 +1,52 @@ +""" +健康检查接口 +""" + +from datetime import datetime +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import text + +from src.config.database import get_db + +router = APIRouter() + + +@router.get("/health") +async def health_check(): + """健康检查""" + return { + "status": "healthy", + "timestamp": datetime.utcnow().isoformat(), + "version": "0.1.0", + } + + +@router.get("/health/db") +async def database_health(db: AsyncSession = Depends(get_db)): + """数据库健康检查""" + try: + await db.execute(text("SELECT 1")) + return { + "status": "healthy", + "database": "connected", + "timestamp": datetime.utcnow().isoformat(), + } + except Exception as e: + return { + "status": "unhealthy", + "database": "disconnected", + "error": str(e), + "timestamp": datetime.utcnow().isoformat(), + } + + +@router.get("/health/ready") +async def readiness_check(db: AsyncSession = Depends(get_db)): + """就绪检查""" + try: + await db.execute(text("SELECT 1")) + return {"status": "ready"} + except Exception: + return {"status": "not_ready"} + diff --git a/src/config/__init__.py b/src/config/__init__.py new file mode 100644 index 0000000..174b8ec --- /dev/null +++ b/src/config/__init__.py @@ -0,0 +1,7 @@ +"""配置模块""" + +from src.config.settings import settings +from src.config.database import get_db, init_db, close_db + +__all__ = ["settings", "get_db", "init_db", "close_db"] + diff --git a/src/config/database.py b/src/config/database.py new file mode 100644 index 0000000..ddb8672 --- /dev/null +++ b/src/config/database.py @@ -0,0 +1,57 @@ +""" +数据库配置 +""" + +from typing import AsyncGenerator + +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase + +from src.config.settings import settings + + +class Base(DeclarativeBase): + """SQLAlchemy 模型基类""" + pass + + +# 创建异步引擎 +engine = create_async_engine( + settings.DATABASE_URL, + echo=settings.DATABASE_ECHO, + future=True, +) + +# 创建会话工厂 +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + autocommit=False, + autoflush=False, +) + + +async def init_db(): + """初始化数据库""" + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + +async def close_db(): + """关闭数据库连接""" + await engine.dispose() + + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + """获取数据库会话""" + async with AsyncSessionLocal() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() + diff --git a/src/config/settings.py b/src/config/settings.py new file mode 100644 index 0000000..5ac71bc --- /dev/null +++ b/src/config/settings.py @@ -0,0 +1,48 @@ +""" +应用配置 +""" + +from typing import List +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """应用配置类""" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=True, + ) + + # 应用配置 + APP_NAME: str = "collector-backend" + APP_ENV: str = "development" + DEBUG: bool = True + SECRET_KEY: str = "your-secret-key-change-in-production" + + # 数据库配置 + DATABASE_URL: str = "postgresql+asyncpg://postgres:password@localhost:5432/collector" + DATABASE_ECHO: bool = False + + # Redis配置 + REDIS_URL: str = "redis://localhost:6379/0" + + # API配置 + API_PREFIX: str = "/api/v1" + CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:5173"] + + # 日志配置 + LOG_LEVEL: str = "INFO" + + # Gitea集成 + GITEA_URL: str = "" + GITEA_TOKEN: str = "" + + @property + def is_production(self) -> bool: + return self.APP_ENV == "production" + + +settings = Settings() + diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..88e50f0 --- /dev/null +++ b/src/main.py @@ -0,0 +1,65 @@ +""" +FastAPI 应用入口 +""" + +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from src.config.settings import settings +from src.config.database import init_db, close_db +from src.api.v1 import events, analytics, config, health + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期管理""" + # 启动时初始化 + await init_db() + yield + # 关闭时清理 + await close_db() + + +def create_app() -> FastAPI: + """创建 FastAPI 应用实例""" + app = FastAPI( + title=settings.APP_NAME, + description="IDE数据采集后端服务 - 采集AI编程工具使用数据", + version="0.1.0", + docs_url="/docs", + redoc_url="/redoc", + lifespan=lifespan, + ) + + # CORS 中间件 + app.add_middleware( + CORSMiddleware, + allow_origins=settings.CORS_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # 注册路由 + app.include_router(health.router, prefix=settings.API_PREFIX, tags=["健康检查"]) + app.include_router(events.router, prefix=settings.API_PREFIX, tags=["事件管理"]) + app.include_router(analytics.router, prefix=settings.API_PREFIX, tags=["数据分析"]) + app.include_router(config.router, prefix=settings.API_PREFIX, tags=["配置管理"]) + + return app + + +app = create_app() + + +@app.get("/") +async def root(): + """根路径""" + return { + "name": settings.APP_NAME, + "version": "0.1.0", + "status": "running", + } + diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..ab57cc9 --- /dev/null +++ b/src/models/__init__.py @@ -0,0 +1,7 @@ +"""数据模型""" + +from src.models.event import Event +from src.models.user import User + +__all__ = ["Event", "User"] + diff --git a/src/models/event.py b/src/models/event.py new file mode 100644 index 0000000..27b9f96 --- /dev/null +++ b/src/models/event.py @@ -0,0 +1,59 @@ +""" +事件模型 +""" + +from datetime import datetime +from typing import Optional +from sqlalchemy import String, DateTime, JSON, Integer, Boolean, Float +from sqlalchemy.orm import Mapped, mapped_column + +from src.config.database import Base + + +class Event(Base): + """事件表""" + + __tablename__ = "events" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + event_id: Mapped[str] = mapped_column(String(64), unique=True, index=True) + event_type: Mapped[str] = mapped_column(String(64), index=True) + timestamp: Mapped[datetime] = mapped_column(DateTime, index=True) + + # 用户信息 + user_id: Mapped[str] = mapped_column(String(64), index=True) + ide_type: Mapped[str] = mapped_column(String(32)) + ide_version: Mapped[Optional[str]] = mapped_column(String(32), nullable=True) + os: Mapped[Optional[str]] = mapped_column(String(32), nullable=True) + + # 代码上下文 + file_path: Mapped[Optional[str]] = mapped_column(String(512), nullable=True) + language: Mapped[Optional[str]] = mapped_column(String(32), nullable=True, index=True) + project_type: Mapped[Optional[str]] = mapped_column(String(32), nullable=True) + workspace: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + + # AI交互信息 + ai_provider: Mapped[Optional[str]] = mapped_column(String(32), nullable=True, index=True) + ai_model: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + prompt_tokens: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + completion_tokens: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + latency_ms: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + + # 代码数据 + suggested_code: Mapped[Optional[str]] = mapped_column(String(2000), nullable=True) + accepted: Mapped[Optional[bool]] = mapped_column(Boolean, nullable=True) + decision_time_ms: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + + # 会话信息 + session_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True, index=True) + turn_number: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + + # 元数据 + metadata: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True) + + # 记录时间 + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) + + def __repr__(self) -> str: + return f"" + diff --git a/src/models/user.py b/src/models/user.py new file mode 100644 index 0000000..bc391fd --- /dev/null +++ b/src/models/user.py @@ -0,0 +1,38 @@ +""" +用户模型 +""" + +from datetime import datetime +from typing import Optional +from sqlalchemy import String, DateTime, Boolean, Integer +from sqlalchemy.orm import Mapped, mapped_column + +from src.config.database import Base + + +class User(Base): + """用户表(匿名用户)""" + + __tablename__ = "users" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + user_id: Mapped[str] = mapped_column(String(64), unique=True, index=True) + + # 首次看到的信息 + first_ide_type: Mapped[Optional[str]] = mapped_column(String(32), nullable=True) + first_os: Mapped[Optional[str]] = mapped_column(String(32), nullable=True) + + # 统计信息 + total_events: Mapped[int] = mapped_column(Integer, default=0) + total_completions: Mapped[int] = mapped_column(Integer, default=0) + total_accepted: Mapped[int] = mapped_column(Integer, default=0) + total_sessions: Mapped[int] = mapped_column(Integer, default=0) + + # 活跃状态 + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + first_seen_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) + last_seen_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) + + def __repr__(self) -> str: + return f"" + diff --git a/src/repositories/__init__.py b/src/repositories/__init__.py new file mode 100644 index 0000000..09c79ab --- /dev/null +++ b/src/repositories/__init__.py @@ -0,0 +1,2 @@ +"""数据仓库模块""" + diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py new file mode 100644 index 0000000..605bc05 --- /dev/null +++ b/src/schemas/__init__.py @@ -0,0 +1,16 @@ +"""Pydantic schemas""" + +from src.schemas.event import EventCreate, EventResponse, EventBatchCreate, EventBatchResponse +from src.schemas.analytics import OverviewStats, AcceptanceRateStats, TokenUsageStats, UserActivityStats + +__all__ = [ + "EventCreate", + "EventResponse", + "EventBatchCreate", + "EventBatchResponse", + "OverviewStats", + "AcceptanceRateStats", + "TokenUsageStats", + "UserActivityStats", +] + diff --git a/src/schemas/analytics.py b/src/schemas/analytics.py new file mode 100644 index 0000000..1eddb94 --- /dev/null +++ b/src/schemas/analytics.py @@ -0,0 +1,61 @@ +""" +分析 Schema +""" + +from datetime import datetime +from typing import List, Dict, Any, Optional +from pydantic import BaseModel, Field + + +class TimeSeriesPoint(BaseModel): + """时序数据点""" + + timestamp: datetime + value: float + + +class OverviewStats(BaseModel): + """概览统计""" + + totalEvents: int = Field(..., description="总事件数") + totalUsers: int = Field(..., description="总用户数") + activeUsers: int = Field(..., description="活跃用户数") + totalCompletions: int = Field(..., description="总补全数") + totalAccepted: int = Field(..., description="总接受数") + acceptanceRate: float = Field(..., description="采纳率") + avgLatencyMs: float = Field(..., description="平均延迟") + totalTokens: int = Field(..., description="总Token数") + periodStart: datetime + periodEnd: datetime + + +class AcceptanceRateStats(BaseModel): + """采纳率统计""" + + overall: float = Field(..., description="总体采纳率") + timeSeries: List[TimeSeriesPoint] = Field(..., description="时序数据") + byProvider: Dict[str, float] = Field(..., description="按提供商") + byLanguage: Dict[str, float] = Field(..., description="按语言") + byIDE: Dict[str, float] = Field(..., description="按IDE") + + +class TokenUsageStats(BaseModel): + """Token使用统计""" + + totalPromptTokens: int = Field(..., description="总提示词Token") + totalCompletionTokens: int = Field(..., description="总补全Token") + totalTokens: int = Field(..., description="总Token") + avgTokensPerCompletion: float = Field(..., description="平均每次补全Token") + timeSeries: List[TimeSeriesPoint] = Field(..., description="时序数据") + byProvider: Dict[str, int] = Field(..., description="按提供商") + + +class UserActivityStats(BaseModel): + """用户活动统计""" + + dailyActiveUsers: List[TimeSeriesPoint] = Field(..., description="日活用户") + hourlyDistribution: Dict[int, int] = Field(..., description="小时分布") + topUsers: List[Dict[str, Any]] = Field(..., description="活跃用户排行") + avgEventsPerUser: float = Field(..., description="平均每用户事件数") + avgSessionsPerUser: float = Field(..., description="平均每用户会话数") + diff --git a/src/schemas/event.py b/src/schemas/event.py new file mode 100644 index 0000000..09113ed --- /dev/null +++ b/src/schemas/event.py @@ -0,0 +1,97 @@ +""" +事件 Schema +""" + +from datetime import datetime +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + + +class UserInfoSchema(BaseModel): + """用户信息""" + + userId: str = Field(..., description="用户ID") + ideType: str = Field(..., description="IDE类型") + ideVersion: Optional[str] = Field(None, description="IDE版本") + os: Optional[str] = Field(None, description="操作系统") + timezone: Optional[str] = Field(None, description="时区") + + +class CodeContextSchema(BaseModel): + """代码上下文""" + + filePath: Optional[str] = Field(None, description="文件路径") + language: Optional[str] = Field(None, description="编程语言") + projectType: Optional[str] = Field(None, description="项目类型") + workspace: Optional[str] = Field(None, description="工作区") + cursorLine: Optional[int] = Field(None, description="光标行号") + cursorColumn: Optional[int] = Field(None, description="光标列号") + + +class AIInteractionSchema(BaseModel): + """AI交互信息""" + + provider: str = Field(..., description="AI提供商") + model: Optional[str] = Field(None, description="模型名称") + promptTokens: Optional[int] = Field(None, description="提示词token数") + completionTokens: Optional[int] = Field(None, description="补全token数") + latencyMs: Optional[int] = Field(None, description="延迟(ms)") + + +class CodeDataSchema(BaseModel): + """代码数据""" + + beforeCursor: Optional[str] = Field(None, description="光标前代码") + suggestedCode: Optional[str] = Field(None, description="建议代码") + afterModification: Optional[str] = Field(None, description="修改后代码") + accepted: Optional[bool] = Field(None, description="是否接受") + modifications: Optional[List[str]] = Field(None, description="修改列表") + + +class EventCreate(BaseModel): + """创建事件请求""" + + eventId: str = Field(..., description="事件ID") + eventType: str = Field(..., description="事件类型") + timestamp: str = Field(..., description="时间戳") + userInfo: UserInfoSchema + context: Optional[CodeContextSchema] = None + aiInteraction: Optional[AIInteractionSchema] = None + codeData: Optional[CodeDataSchema] = None + sessionId: Optional[str] = Field(None, description="会话ID") + turnNumber: Optional[int] = Field(None, description="对话轮次") + decisionTimeMs: Optional[int] = Field(None, description="决策时间(ms)") + metadata: Optional[Dict[str, Any]] = None + + +class EventResponse(BaseModel): + """事件响应""" + + id: int + eventId: str + eventType: str + timestamp: datetime + userId: str + ideType: str + language: Optional[str] = None + aiProvider: Optional[str] = None + accepted: Optional[bool] = None + createdAt: datetime + + class Config: + from_attributes = True + + +class EventBatchCreate(BaseModel): + """批量创建事件请求""" + + events: List[EventCreate] = Field(..., description="事件列表") + + +class EventBatchResponse(BaseModel): + """批量创建事件响应""" + + success: bool + processed: int + timestamp: str + diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 0000000..a8bf5d1 --- /dev/null +++ b/src/services/__init__.py @@ -0,0 +1,7 @@ +"""服务模块""" + +from src.services.event_processor import EventProcessor +from src.services.analytics_engine import AnalyticsEngine + +__all__ = ["EventProcessor", "AnalyticsEngine"] + diff --git a/src/services/analytics_engine.py b/src/services/analytics_engine.py new file mode 100644 index 0000000..cec885f --- /dev/null +++ b/src/services/analytics_engine.py @@ -0,0 +1,190 @@ +""" +分析引擎服务 +""" + +from datetime import datetime, timedelta +from typing import Dict, List, Any + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, func, and_ + +from src.models.event import Event +from src.schemas.analytics import ( + OverviewStats, + AcceptanceRateStats, + TokenUsageStats, + UserActivityStats, + TimeSeriesPoint, +) + + +class AnalyticsEngine: + """分析引擎""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def get_overview_stats( + self, start_date: datetime, end_date: datetime + ) -> OverviewStats: + """获取概览统计""" + date_filter = and_( + Event.timestamp >= start_date, + Event.timestamp <= end_date, + ) + + # 总事件数 + total_events_stmt = select(func.count(Event.id)).where(date_filter) + total_events = (await self.db.execute(total_events_stmt)).scalar() or 0 + + # 总用户数 + total_users_stmt = select(func.count(func.distinct(Event.user_id))).where(date_filter) + total_users = (await self.db.execute(total_users_stmt)).scalar() or 0 + + # 补全相关统计 + completion_filter = and_( + date_filter, + Event.event_type.in_([ + "code_completion_shown", + "code_completion_accepted", + "code_completion_rejected", + ]), + ) + + total_completions_stmt = select(func.count(Event.id)).where( + and_(date_filter, Event.event_type == "code_completion_shown") + ) + total_completions = (await self.db.execute(total_completions_stmt)).scalar() or 0 + + total_accepted_stmt = select(func.count(Event.id)).where( + and_(date_filter, Event.event_type == "code_completion_accepted") + ) + total_accepted = (await self.db.execute(total_accepted_stmt)).scalar() or 0 + + # 采纳率 + acceptance_rate = ( + total_accepted / total_completions if total_completions > 0 else 0.0 + ) + + # 平均延迟 + avg_latency_stmt = select(func.avg(Event.latency_ms)).where( + and_(date_filter, Event.latency_ms.isnot(None)) + ) + avg_latency = (await self.db.execute(avg_latency_stmt)).scalar() or 0.0 + + # 总Token + total_tokens_stmt = select( + func.coalesce(func.sum(Event.prompt_tokens), 0) + + func.coalesce(func.sum(Event.completion_tokens), 0) + ).where(date_filter) + total_tokens = (await self.db.execute(total_tokens_stmt)).scalar() or 0 + + return OverviewStats( + totalEvents=total_events, + totalUsers=total_users, + activeUsers=total_users, # 简化:活跃用户=总用户 + totalCompletions=total_completions, + totalAccepted=total_accepted, + acceptanceRate=round(acceptance_rate, 4), + avgLatencyMs=round(float(avg_latency), 2), + totalTokens=total_tokens, + periodStart=start_date, + periodEnd=end_date, + ) + + async def get_acceptance_rate( + self, start_date: datetime, end_date: datetime, group_by: str + ) -> AcceptanceRateStats: + """获取采纳率统计""" + # 简化实现 + overview = await self.get_overview_stats(start_date, end_date) + + return AcceptanceRateStats( + overall=overview.acceptanceRate, + timeSeries=[], + byProvider={}, + byLanguage={}, + byIDE={}, + ) + + async def get_token_usage( + self, start_date: datetime, end_date: datetime + ) -> TokenUsageStats: + """获取Token使用统计""" + date_filter = and_( + Event.timestamp >= start_date, + Event.timestamp <= end_date, + ) + + # Token统计 + token_stmt = select( + func.coalesce(func.sum(Event.prompt_tokens), 0).label("prompt"), + func.coalesce(func.sum(Event.completion_tokens), 0).label("completion"), + ).where(date_filter) + + result = (await self.db.execute(token_stmt)).one() + prompt_tokens = result.prompt or 0 + completion_tokens = result.completion or 0 + + # 补全次数 + completion_count_stmt = select(func.count(Event.id)).where( + and_(date_filter, Event.completion_tokens.isnot(None)) + ) + completion_count = (await self.db.execute(completion_count_stmt)).scalar() or 1 + + return TokenUsageStats( + totalPromptTokens=prompt_tokens, + totalCompletionTokens=completion_tokens, + totalTokens=prompt_tokens + completion_tokens, + avgTokensPerCompletion=round( + (prompt_tokens + completion_tokens) / completion_count, 2 + ), + timeSeries=[], + byProvider={}, + ) + + async def get_user_activity( + self, start_date: datetime, end_date: datetime + ) -> UserActivityStats: + """获取用户活动统计""" + date_filter = and_( + Event.timestamp >= start_date, + Event.timestamp <= end_date, + ) + + # 用户事件统计 + user_events_stmt = select(func.count(Event.id)).where(date_filter) + total_events = (await self.db.execute(user_events_stmt)).scalar() or 0 + + # 用户数 + user_count_stmt = select(func.count(func.distinct(Event.user_id))).where(date_filter) + user_count = (await self.db.execute(user_count_stmt)).scalar() or 1 + + return UserActivityStats( + dailyActiveUsers=[], + hourlyDistribution={}, + topUsers=[], + avgEventsPerUser=round(total_events / user_count, 2), + avgSessionsPerUser=0.0, + ) + + async def get_provider_stats( + self, start_date: datetime, end_date: datetime + ) -> Dict[str, Any]: + """获取提供商统计""" + date_filter = and_( + Event.timestamp >= start_date, + Event.timestamp <= end_date, + Event.ai_provider.isnot(None), + ) + + stmt = select( + Event.ai_provider, + func.count(Event.id).label("count"), + ).where(date_filter).group_by(Event.ai_provider) + + result = await self.db.execute(stmt) + providers = {row.ai_provider: row.count for row in result} + + return {"providers": providers} + diff --git a/src/services/event_processor.py b/src/services/event_processor.py new file mode 100644 index 0000000..e6251ee --- /dev/null +++ b/src/services/event_processor.py @@ -0,0 +1,104 @@ +""" +事件处理服务 +""" + +from datetime import datetime +from typing import List, Optional + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from src.models.event import Event +from src.schemas.event import EventCreate, EventResponse + + +class EventProcessor: + """事件处理器""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def process_event(self, event_data: EventCreate) -> EventResponse: + """处理单个事件""" + event = Event( + event_id=event_data.eventId, + event_type=event_data.eventType, + timestamp=datetime.fromisoformat(event_data.timestamp.replace("Z", "+00:00")), + user_id=event_data.userInfo.userId, + ide_type=event_data.userInfo.ideType, + ide_version=event_data.userInfo.ideVersion, + os=event_data.userInfo.os, + file_path=event_data.context.filePath if event_data.context else None, + language=event_data.context.language if event_data.context else None, + project_type=event_data.context.projectType if event_data.context else None, + workspace=event_data.context.workspace if event_data.context else None, + ai_provider=event_data.aiInteraction.provider if event_data.aiInteraction else None, + ai_model=event_data.aiInteraction.model if event_data.aiInteraction else None, + prompt_tokens=event_data.aiInteraction.promptTokens if event_data.aiInteraction else None, + completion_tokens=event_data.aiInteraction.completionTokens if event_data.aiInteraction else None, + latency_ms=event_data.aiInteraction.latencyMs if event_data.aiInteraction else None, + suggested_code=event_data.codeData.suggestedCode if event_data.codeData else None, + accepted=event_data.codeData.accepted if event_data.codeData else None, + decision_time_ms=event_data.decisionTimeMs, + session_id=event_data.sessionId, + turn_number=event_data.turnNumber, + metadata=event_data.metadata, + ) + + self.db.add(event) + await self.db.flush() + await self.db.refresh(event) + + return self._to_response(event) + + async def process_batch(self, events: List[EventCreate]) -> List[EventResponse]: + """批量处理事件""" + results = [] + for event_data in events: + result = await self.process_event(event_data) + results.append(result) + return results + + async def get_event(self, event_id: str) -> Optional[EventResponse]: + """获取单个事件""" + stmt = select(Event).where(Event.event_id == event_id) + result = await self.db.execute(stmt) + event = result.scalar_one_or_none() + return self._to_response(event) if event else None + + async def list_events( + self, + skip: int = 0, + limit: int = 100, + event_type: Optional[str] = None, + user_id: Optional[str] = None, + ) -> List[EventResponse]: + """列出事件""" + stmt = select(Event).offset(skip).limit(limit).order_by(Event.timestamp.desc()) + + if event_type: + stmt = stmt.where(Event.event_type == event_type) + if user_id: + stmt = stmt.where(Event.user_id == user_id) + + result = await self.db.execute(stmt) + events = result.scalars().all() + + return [self._to_response(event) for event in events] + + def _to_response(self, event: Event) -> EventResponse: + """转换为响应对象""" + return EventResponse( + id=event.id, + eventId=event.event_id, + eventType=event.event_type, + timestamp=event.timestamp, + userId=event.user_id, + ideType=event.ide_type, + language=event.language, + aiProvider=event.ai_provider, + accepted=event.accepted, + createdAt=event.created_at, + ) + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..ba4b12d --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,2 @@ +"""测试模块""" + diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..7b8f90c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,60 @@ +""" +测试配置 +""" + +import pytest +import asyncio +from typing import AsyncGenerator + +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker + +from src.main import app +from src.config.database import Base, get_db + + +# 测试数据库URL +TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db" + +# 创建测试引擎 +test_engine = create_async_engine(TEST_DATABASE_URL, echo=False) +TestSessionLocal = async_sessionmaker( + test_engine, class_=AsyncSession, expire_on_commit=False +) + + +@pytest.fixture(scope="session") +def event_loop(): + """创建事件循环""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="function") +async def db_session() -> AsyncGenerator[AsyncSession, None]: + """创建测试数据库会话""" + async with test_engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with TestSessionLocal() as session: + yield session + + async with test_engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + +@pytest.fixture(scope="function") +async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]: + """创建测试客户端""" + + async def override_get_db(): + yield db_session + + app.dependency_overrides[get_db] = override_get_db + + async with AsyncClient(app=app, base_url="http://test") as ac: + yield ac + + app.dependency_overrides.clear() +