Как мы агрегируем 25 AI-моделей через OpenRouter: практический гид по архитектуре
Разбираем архитектуру агрегатора AI-моделей на базе OpenRouter API: унификация text/image/video в единый интерфейс, failover между Claude, GPT-5 и DeepSeek, retry-политики с idempotency keys, кеширование промптов и финансовая модель с маржинальностью на разных провайдерах.
Введение: зачем агрегировать AI-модели
За последние два года количество доступных языковых моделей выросло с десятка до сотен вариантов. Claude 3.5 Sonnet показывает лучшие результаты в reasoning, GPT-4 Turbo оптимален для structured output, DeepSeek-V3 предлагает выгодное соотношение цена-качество для азиатских языков, а Gemini 2.0 Flash демонстрирует высокую скорость при работе с мультимодальным контентом.
Проблема выбора единственной модели становится критичной для production-систем. Каждая модель имеет свои сильные стороны, ограничения по rate limits, различную стоимость токенов и периодические downtime. Разработчики сталкиваются с необходимостью поддерживать интеграции с множеством провайдеров: OpenAI, Anthropic, Google, Mistral, Cohere и десятками других.
OpenRouter решает эту проблему, предоставляя единый API-шлюз к 200+ моделям от различных провайдеров. Однако простое использование OpenRouter как proxy недостаточно для enterprise-решений. Требуется дополнительный слой абстракции, который обеспечит:
- Унифицированный интерфейс для text, image и video generation с нормализацией параметров
- Интеллектуальный failover между моделями при недоступности или превышении rate limits
- Финансовую оптимизацию через динамический выбор модели по критерию цена-качество
- Надёжность через retry-механизмы с exponential backoff и idempotency
- Снижение затрат через кеширование повторяющихся промптов
В нашей системе мы агрегируем 25 наиболее востребованных моделей через OpenRouter API, обрабатывая 2.3 миллиона запросов ежемесячно с uptime 99.7%. Средняя latency составляет 1.8 секунды для text generation и 12 секунд для image synthesis. В этой статье разберём архитектурные решения, которые позволили достичь таких показателей.
Архитектура системы агрегации
Система построена на трёхуровневой архитектуре:
Уровень 1: API Gateway — принимает запросы от клиентов, выполняет аутентификацию через JWT, валидацию входных параметров и rate limiting на уровне пользователя. Реализован на FastAPI с использованием Redis для хранения токенов и счётчиков запросов. Средняя задержка на этом уровне составляет 15-20 мс.
Уровень 2: Orchestration Layer — ядро системы, отвечающее за выбор оптимальной модели, failover-логику, retry-механизмы и финансовый учёт. Здесь происходит конвертация пользовательских кредитов в стоимость вызова конкретной модели, логирование метрик и управление idempotency keys. Реализован как набор Python-сервисов с использованием Celery для асинхронных задач.
Уровень 3: Provider Adapters — адаптеры для взаимодействия с OpenRouter API. Каждый адаптер нормализует запросы и ответы под специфику конкретной модели, обрабатывает streaming responses и конвертирует ошибки в унифицированный формат. Используем aiohttp для асинхронных HTTP-запросов с connection pooling.
Все компоненты взаимодействуют через message queue (RabbitMQ) для обеспечения горизонтального масштабирования. PostgreSQL используется для хранения истории запросов, финансовых транзакций и метрик производительности моделей. Redis Cluster обеспечивает distributed caching для промптов и результатов с TTL от 1 часа до 7 дней в зависимости от типа контента.
Унификация интерфейса: text, image, video
Ключевая задача агрегатора — предоставить единообразный API независимо от типа генерируемого контента. OpenRouter поддерживает различные модели с разными форматами запросов: text-generation модели используют chat completion format, image-модели требуют промпты в виде текстовых описаний, video-модели работают с дополнительными параметрами длительности и разрешения.
Мы разработали унифицированный формат запроса:
{
"type": "text|image|video",
"prompt": "User input or system message",
"context": ["previous", "messages"],
"parameters": {
"temperature": 0.7,
"max_tokens": 2048,
"top_p": 0.9
},
"preferences": {
"model_priority": ["claude-3.5-sonnet", "gpt-4-turbo"],
"max_cost_credits": 100,
"latency_priority": "balanced"
},
"idempotency_key": "uuid-v4-string"
}
Поле type определяет маршрутизацию запроса. Для text используются модели Claude, GPT, Gemini, Llama. Для image — DALL-E 3, Stable Diffusion XL, Midjourney через OpenRouter. Для video — пока только экспериментальные модели вроде Runway Gen-2 и Pika Labs, доступные через специальные эндпоинты.
Параметр preferences позволяет клиенту указать приоритеты. Если model_priority содержит список моделей, система попытается использовать их в указанном порядке при failover. Параметр max_cost_credits ограничивает максимальную стоимость запроса в внутренних кредитах. Latency_priority может быть "speed" (приоритет скорости), "quality" (приоритет качества) или "balanced".
Внутри Orchestration Layer происходит трансформация этого запроса в формат OpenRouter API с добавлением необходимых заголовков, выбором конкретной модели и установкой параметров retry.
Модель ценообразования и конвертация в кредиты
Различные модели имеют разную стоимость. OpenRouter предоставляет pricing в долларах за миллион токенов (input и output отдельно). Для упрощения биллинга мы ввели внутреннюю валюту — кредиты, где 1 кредит = 0.001 USD.
| Модель | Input ($/1M tokens) | Output ($/1M tokens) | Средний запрос (tokens) | Стоимость (кредиты) | Use Case |
|---|---|---|---|---|---|
| Claude 3.5 Sonnet | 3.00 | 15.00 | 1500 in / 500 out | 12.0 | Complex reasoning, code generation |
| GPT-4 Turbo | 10.00 | 30.00 | 1200 in / 400 out | 24.0 | Structured output, function calling |
| GPT-4o mini | 0.15 | 0.60 | 1000 in / 300 out | 0.33 | High-volume simple tasks |
| DeepSeek-V3 | 0.27 | 1.10 | 1400 in / 600 out | 1.04 | Multilingual, cost-effective |
| Gemini 2.0 Flash | 0.10 | 0.40 | 1100 in / 350 out | 0.25 | Fast responses, multimodal |
| Llama 3.1 405B | 2.80 | 2.80 | 1300 in / 450 out | 4.90 | Open-source alternative |
| DALL-E 3 HD | — | — | 1 image 1024x1024 | 80.0 | High-quality image generation |
| SDXL Turbo | — | — | 1 image 1024x1024 | 4.0 | Fast image generation |
Конвертация происходит в реальном времени на основе актуальных цен OpenRouter. Мы поддерживаем кеш pricing data с обновлением каждые 6 часов через их API эндпоинт /api/v1/models. При расчёте стоимости учитывается:
- Количество input и output токенов (для text моделей используем tiktoken для предварительной оценки)
- Фиксированная стоимость для image/video моделей в зависимости от разрешения
- Наценка системы (маржа) в диапазоне 15-40% в зависимости от модели
- Скидки для пользователей с высоким объёмом (volume discount от 10% при >100k кредитов/месяц)
Финансовая транзакция создаётся до выполнения запроса (pre-authorization), блокируя необходимое количество кредитов. После получения ответа от модели происходит финальный расчёт на основе фактического количества токенов и либо списание точной суммы, либо возврат разницы.
Failover-стратегия и приоритизация моделей
Недоступность модели может быть вызвана несколькими причинами: rate limit провайдера, временный downtime, превышение capacity, или блокировка по географическому признаку. OpenRouter возвращает специфичные коды ошибок: 429 для rate limits, 503 для unavailability, 500 для internal errors.
Наша failover-стратегия основана на трёхуровневой приоритизации:
Уровень 1: User Preferences — если пользователь явно указал model_priority, система последовательно пробует модели из этого списка. Например, ["claude-3.5-sonnet", "gpt-4-turbo", "deepseek-v3"] означает: сначала Claude, при недоступности — GPT-4, затем DeepSeek.
Уровень 2: Task-Based Selection — если preferences не указаны, система анализирует тип задачи. Для code generation приоритет отдаётся Claude 3.5 Sonnet и GPT-4. Для multilingual content — DeepSeek-V3 и Gemini. Для structured output — GPT-4 Turbo с JSON mode. Выбор основан на бенчмарках, которые мы регулярно обновляем.
Уровень 3: Cost-Performance Optimization — если несколько моделей подходят по качеству, выбирается наиболее экономичная с учётом latency_priority. Для "speed" приоритет у Gemini 2.0 Flash и GPT-4o mini. Для "quality" — Claude 3.5 Sonnet и GPT-4 Turbo. Для "balanced" используется scoring formula:
score = (quality_benchmark / 100) * 0.6 - (cost_per_request / 100) * 0.3 - (avg_latency_sec / 10) * 0.1
При получении ошибки 429 (rate limit) система автоматически переключается на следующую модель в приоритете без retry на текущей. Для ошибок 503 и 500 выполняется один retry с задержкой 2 секунды, затем failover. Для ошибок 4xx (client errors) failover не выполняется, ошибка возвращается клиенту.
Мы ведём статистику доступности моделей в реальном времени. Если модель показывает availability ниже 95% за последний час, её приоритет временно понижается. Это предотвращает каскадные failover на проблемные модели.
Retry-политики и idempotency keys
Сетевые запросы к AI-моделям могут занимать от 1 до 30 секунд. За это время возможны timeouts, разрывы соединения, временные недоступности. Retry-механизм критичен для обеспечения надёжности, но создаёт риск дублирования запросов и двойного списания средств.
Мы реализовали retry-политику с exponential backoff и jitter:
import asyncio
import aiohttp
from typing import Optional, Dict, Any
import hashlib
import time
import random
class OpenRouterClient:
def __init__(self, api_key: str, base_url: str = "https://openrouter.ai/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.session: Optional[aiohttp.ClientSession] = None
# Конфигурация retry: максимум 3 попытки, base delay 1 сек
self.max_retries = 3
self.base_delay = 1.0
self.max_delay = 16.0
# Timeout для запросов: 30 сек на соединение, 120 сек на чтение
self.timeout = aiohttp.ClientTimeout(total=150, connect=30, sock_read=120)
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers={
"Authorization": f"Bearer {self.api_key}",
"HTTP-Referer": "https://yourdomain.com",
"X-Title": "AI Aggregator Service"
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def _generate_idempotency_key(self, request_data: Dict[str, Any]) -> str:
"""
Генерация idempotency key на основе хеша запроса.
Используется SHA-256 от JSON-сериализованных данных.
"""
import json
# Сортируем ключи для детерминированности
canonical = json.dumps(request_data, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()
def _calculate_backoff(self, attempt: int) -> float:
"""
Exponential backoff с jitter: delay = min(base * 2^attempt + random, max_delay)
Jitter предотвращает thundering herd при массовых retry.
"""
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
jitter = random.uniform(0, delay * 0.1) # 10% jitter
return delay + jitter
async def _should_retry(self, status: int, response_data: Dict) -> bool:
"""
Определяет, нужно ли повторять запрос на основе статуса и тела ответа.
Retry для: 429 (rate limit), 500, 502, 503, 504 (server errors),
timeout errors. Не retry для 4xx client errors (кроме 429).
"""
if status == 429:
return True
if 500 <= status < 600:
return True
# Проверяем специфичные коды ошибок OpenRouter
error_code = response_data.get("error", {}).get("code")
if error_code in ["rate_limit_exceeded", "model_unavailable", "timeout"]:
return True
return False
async def complete(
self,
model: str,
messages: list,
temperature: float = 0.7,
max_tokens: int = 2048,
idempotency_key: Optional[str] = None
) -> Dict[str, Any]:
"""
Выполняет chat completion запрос с retry-логикой и idempotency.
Args:
model: Идентификатор модели (например, "anthropic/claude-3.5-sonnet")
messages: Список сообщений в формате OpenAI chat completion
temperature: Параметр креативности (0.0-2.0)
max_tokens: Максимальное количество токенов в ответе
idempotency_key: Ключ идемпотентности (опционально)
Returns:
Словарь с ответом модели и метаданными
"""
request_data = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
# Генерируем idempotency key если не предоставлен
if not idempotency_key:
idempotency_key = self._generate_idempotency_key(request_data)
headers = {"X-Idempotency-Key": idempotency_key}
last_exception = None
for attempt in range(self.max_retries):
try:
start_time = time.time()
async with self.session.post(
f"{self.base_url}/chat/completions",
json=request_data,
headers=headers
) as response:
latency = time.time() - start_time
response_data = await response.json()
if response.status == 200:
# Успешный ответ, добавляем метаданные
response_data["_metadata"] = {
"latency_sec": round(latency, 3),
"attempt": attempt + 1,
"idempotency_key": idempotency_key
}
return response_data
# Проверяем необходимость retry
if attempt < self.max_retries - 1:
should_retry = await self._should_retry(response.status, response_data)
if should_retry:
delay = self._calculate_backoff(attempt)
print(f"Retry attempt {attempt + 1} after {delay:.2f}s for status {response.status}")
await asyncio.sleep(delay)
continue
# Финальная ошибка без retry
raise Exception(f"API error {response.status}: {response_data}")
except asyncio.TimeoutError as e:
last_exception = e
if attempt < self.max_retries - 1:
delay = self._calculate_backoff(attempt)
print(f"Timeout on attempt {attempt + 1}, retrying after {delay:.2f}s")
await asyncio.sleep(delay)
continue
except aiohttp.ClientError as e:
last_exception = e
if attempt < self.max_retries - 1:
delay = self._calculate_backoff(attempt)
print(f"Network error on attempt {attempt + 1}, retrying after {delay:.2f}s")
await asyncio.sleep(delay)
continue
# Все попытки исчерпаны
raise Exception(f"Max retries exceeded. Last error: {last_exception}")
# Пример использования
async def example_usage():
async with OpenRouterClient(api_key="your-api-key") as client:
response = await client.complete(
model="anthropic/claude-3.5-sonnet",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum entanglement in simple terms."}
],
temperature=0.7,
max_tokens=500,
idempotency_key="unique-request-id-12345"
)
print(f"Response: {response['choices'][0]['message']['content']}")
print(f"Latency: {response['_metadata']['latency_sec']}s")
Ключевые моменты реализации:
- Idempotency key передаётся в заголовке X-Idempotency-Key и сохраняется на стороне OpenRouter на 24 часа
- При повторном запросе с тем же ключом возвращается кешированный результат без повторного выполнения
- Exponential backoff начинается с 1 секунды и удваивается с каждой попыткой до максимума 16 секунд
- Jitter в 10% предотвращает синхронизацию retry от множества клиентов
- Timeout настроен с учётом длительности генерации: 30 сек на установку соединения, 120 сек на чтение ответа
- Метаданные запроса (latency, количество попыток) добавляются к ответу для мониторинга
В production мы дополнительно логируем все retry в Prometheus для анализа паттернов недоступности и настройки алертов при превышении порога retry rate.
Кеширование промптов для оптимизации затрат
Анализ наших запросов показал, что около 35% промптов повторяются с небольшими вариациями. Типичные сценарии: одинаковые system prompts для разных пользовательских запросов, повторные запросы на summarization одного и того же документа, идентичные image generation промпты.
Мы реализовали двухуровневое кеширование:
L1 Cache: Exact Match — полное совпадение промпта и параметров. Используем Redis с ключом вида cache:prompt:{sha256_hash}. TTL зависит от типа контента: 1 час для text generation (модели часто обновляются), 24 часа для image generation (результат более стабилен), 7 дней для embeddings (детерминированы).
L2 Cache: Semantic Similarity — для text prompts вычисляем embedding через lightweight модель (all-MiniLM-L6-v2, 384 dimensions) и ищем похожие промпты с cosine similarity > 0.95. Если найден, возвращаем кешированный результат с пометкой "similar_prompt_used". Это даёт дополнительные 12% cache hit rate.
Кеширование применяется только для deterministic параметров (temperature=0, top_p=1.0). Для creative generation с temperature > 0 кеш не используется, так как ожидается вариативность ответов.
Экономия от кеширования составляет около 18% от общих расходов на API calls. Для high-volume клиентов (более 100k запросов/месяц) мы предлагаем опцию aggressive caching с увеличенным TTL и более низким порогом similarity (0.90), что даёт экономию до 30%.
Финансовая модель и маржинальность
Финансовая модель агрегатора строится на балансе между конкурентоспособным pricing для клиентов и достаточной маржой для покрытия операционных расходов и развития.
Структура затрат на один миллион запросов:
- Стоимость API calls к OpenRouter: 65-70% от выручки (зависит от mix моделей)
- Инфраструктура (серверы, БД, кеш): 8-10%
- Мониторинг и логирование: 2-3%
- Support и операционные расходы: 5-7%
- Целевая маржа: 15-20%
Маржинальность варьируется по моделям. На премиум-моделях (Claude 3.5 Sonnet, GPT-4 Turbo) мы устанавливаем наценку 15-20%, так как клиенты готовы платить за качество и не так чувствительны к цене. На budget-моделях (GPT-4o mini, Gemini Flash) наценка 30-40%, так как абсолютная разница в центах незначительна, но процентная маржа выше.
Для image generation наценка составляет 25-35%, так как запросы менее частотны, но более ресурсоёмки в обработке (требуется больше времени на генерацию, выше риск неуспешных попыток).
Мы используем динамическое ценообразование на основе реальных затрат. Каждые 6 часов система пересчитывает стоимость в кредитах на основе актуальных цен OpenRouter и текущего exchange rate USD. Это защищает от волатильности цен провайдеров.
Для enterprise-клиентов доступны кастомные pricing plans с фиксированной ценой за период (monthly commitment) и volume discounts:
- 10% скидка при commitment > 100,000 кредитов/месяц
- 15% скидка при commitment > 500,000 кредитов/месяц
- 20% скидка при commitment > 1,000,000 кредитов/месяц
- Индивидуальные условия для корпоративных клиентов с объёмом > 5M кредитов/месяц
Финансовая отчётность строится в реальном времени. Dashboard показывает текущую маржу по каждой модели, топ-10 клиентов по выручке, distribution запросов по моделям и прогноз расходов на текущий месяц на основе трендов.
Практическая реализация на Python
Рассмотрим практическую реализацию Orchestration Layer с failover-логикой и выбором оптимальной модели:
import asyncio
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from enum import Enum
import time
class ModelProvider(Enum):
ANTHROPIC = "anthropic"
OPENAI = "openai"
GOOGLE = "google"
DEEPSEEK = "deepseek"
META = "meta"
@dataclass
class ModelConfig:
"""Конфигурация модели с метриками производительности и стоимости."""
id: str
provider: ModelProvider
input_cost_per_1m: float # USD за 1M input tokens
output_cost_per_1m: float # USD за 1M output tokens
avg_latency_sec: float
quality_score: float # 0-100, на основе бенчмарков
availability: float # 0-1, текущая доступность за последний час
context_window: int
supports_streaming: bool
def calculate_cost_credits(self, input_tokens: int, output_tokens: int) -> float:
"""Расчёт стоимости в кредитах (1 кредит = 0.001 USD)."""
cost_usd = (
(input_tokens / 1_000_000) * self.input_cost_per_1m +
(output_tokens / 1_000_000) * self.output_cost_per_1m
)
return cost_usd * 1000 # конвертация в кредиты
def calculate_score(self, latency_priority: str) -> float:
"""
Расчёт комплексного score для ранжирования моделей.
Args:
latency_priority: 'speed', 'quality', или 'balanced'
Returns:
Score для сортировки (выше = лучше)
"""
if latency_priority == "speed":
weights = {"quality": 0.3, "cost": 0.2, "latency": 0.5}
elif latency_priority == "quality":
weights = {"quality": 0.7, "cost": 0.1, "latency": 0.2}
else: # balanced
weights = {"quality": 0.5, "cost": 0.3, "latency": 0.2}
# Нормализация метрик (инвертируем cost и latency, так как меньше = лучше)
quality_norm = self.quality_score / 100
cost_norm = 1.0 - min(self.input_cost_per_1m / 10.0, 1.0)
latency_norm = 1.0 - min(self.avg_latency_sec / 5.0, 1.0)
score = (
quality_norm * weights["quality"] +
cost_norm * weights["cost"] +
latency_norm * weights["latency"]
)
# Штраф за низкую доступность
score *= self.availability
return score
class ModelOrchestrator:
"""Оркестратор для выбора и failover между моделями."""
def __init__(self, openrouter_client):
self.client = openrouter_client
self.models = self._initialize_models()
self.availability_cache = {} # Кеш доступности моделей
def _initialize_models(self) -> Dict[str, ModelConfig]:
"""Инициализация конфигураций моделей с актуальными метриками."""
return {
"claude-3.5-sonnet": ModelConfig(
id="anthropic/claude-3.5-sonnet",
provider=ModelProvider.ANTHROPIC,
input_cost_per_1m=3.0,
output_cost_per_1m=15.0,
avg_latency_sec=2.1,
quality_score=95,
availability=0.98,
context_window=200000,
supports_streaming=True
),
"gpt-4-turbo": ModelConfig(
id="openai/gpt-4-turbo",
provider=ModelProvider.OPENAI,
input_cost_per_1m=10.0,
output_cost_per_1m=30.0,
avg_latency_sec=2.5,
quality_score=93,
availability=0.97,
context_window=128000,
supports_streaming=True
),
"gpt-4o-mini": ModelConfig(
id="openai/gpt-4o-mini",
provider=ModelProvider.OPENAI,
input_cost_per_1m=0.15,
output_cost_per_1m=0.60,
avg_latency_sec=1.2,
quality_score=78,
availability=0.99,
context_window=128000,
supports_streaming=True
),
"deepseek-v3": ModelConfig(
id="deepseek/deepseek-chat",
provider=ModelProvider.DEEPSEEK,
input_cost_per_1m=0.27,
output_cost_per_1m=1.10,
avg_latency_sec=1.8,
quality_score=82,
availability=0.96,
context_window=64000,
supports_streaming=True
),
"gemini-2.0-flash": ModelConfig(
id="google/gemini-2.0-flash",
provider=ModelProvider.GOOGLE,
input_cost_per_1m=0.10,
output_cost_per_1m=0.40,
avg_latency_sec=0.9,
quality_score=80,
availability=0.98,
context_window=1000000,
supports_streaming=True
),
}
def _select_models(
self,
user_priority: Optional[List[str]],
latency_priority: str,
max_cost_credits: Optional[float],
estimated_tokens: int
) -> List[ModelConfig]:
"""
Выбор и ранжирование моделей для failover chain.
Args:
user_priority: Список предпочитаемых моделей от пользователя
latency_priority: Приоритет latency ('speed', 'quality', 'balanced')
max_cost_credits: Максимальная стоимость в кредитах
estimated_tokens: Оценочное количество токенов для фильтрации
Returns:
Отсортированный список ModelConfig для попыток
"""
candidates = []
if user_priority:
# Используем пользовательский приоритет
for model_key in user_priority:
if model_key in self.models:
model = self.models[model_key]
# Проверяем cost constraint
if max_cost_credits:
estimated_cost = model.calculate_cost_credits(
estimated_tokens, estimated_tokens // 2
)
if estimated_cost > max_cost_credits:
continue
candidates.append(model)
else:
# Автоматический выбор на основе scoring
for model in self.models.values():
if max_cost_credits:
estimated_cost = model.calculate_cost_credits(
estimated_tokens, estimated_tokens // 2
)
if estimated_cost > max_cost_credits:
continue
candidates.append(model)
# Сортируем по score
candidates.sort(
key=lambda m: m.calculate_score(latency_priority),
reverse=True
)
return candidates[:5] # Максимум 5 моделей в failover chain
async def complete_with_failover(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: int = 2048,
user_priority: Optional[List[str]] = None,
latency_priority: str = "balanced",
max_cost_credits: Optional[float] = None,
idempotency_key: Optional[str] = None
) -> Dict[str, Any]:
"""
Выполняет запрос с автоматическим failover между моделями.
Returns:
Словарь с ответом, метаданными и информацией о использованной модели
"""
# Оценка количества токенов (упрощённо: ~4 символа = 1 токен)
estimated_tokens = sum(len(m.get("content", "")) for m in messages) // 4
# Выбор моделей для failover
models_to_try = self._select_models(
user_priority, latency_priority, max_cost_credits, estimated_tokens
)
if not models_to_try:
raise Exception("No suitable models found for the given constraints")
last_error = None
attempts_log = []
for idx, model_config in enumerate(models_to_try):
try:
start_time = time.time()
# Выполняем запрос через OpenRouter client
response = await self.client.complete(
model=model_config.id,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
idempotency_key=idempotency_key
)
# Успешный ответ
actual_latency = time.time() - start_time
# Расчёт фактической стоимости
usage = response.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
cost_credits = model_config.calculate_cost_credits(
input_tokens, output_tokens
)
# Добавляем расширенные метаданные
response["_orchestrator_metadata"] = {
"model_used": model_config.id,
"provider": model_config.provider.value,
"failover_attempt": idx + 1,
"total_attempts": len(attempts_log) + 1,
"actual_latency_sec": round(actual_latency, 3),
"cost_credits": round(cost_credits, 4),
"attempts_log": attempts_log
}
return response
except Exception as e:
last_error = e
attempts_log.append({
"model": model_config.id,
"error": str(e),
"timestamp": time.time()
})
# Логируем недоступность для обновления availability метрики
self._log_model_failure(model_config.id)
# Если это не последняя модель, продолжаем failover
if idx < len(models_to_try) - 1:
print(f"Failover from {model_config.id} to next model due to: {e}")
await asyncio.sleep(0.5) # Небольшая задержка между failover
continue
# Все модели failed
raise Exception(
f"All {len(models_to_try)} models failed. Last error: {last_error}. "
f"Attempts: {attempts_log}"
)
def _log_model_failure(self, model_id: str):
"""Логирование недоступности модели для обновления availability метрики."""
# В production здесь отправка в monitoring систему (Prometheus, DataDog)
print(f"Model failure logged: {model_id} at {time.time()}")
# Пример использования orchestrator
async def example_orchestrator_usage():
from openrouter_client import OpenRouterClient # Из предыдущего примера
async with OpenRouterClient(api_key="your-api-key") as client:
orchestrator = ModelOrchestrator(client)
# Запрос с пользовательским приоритетом моделей
response = await orchestrator.complete_with_failover(
messages=[
{"role": "system", "content": "You are a code review assistant."},
{"role": "user", "content": "Review this Python function for bugs and improvements:\n\ndef calculate_average(numbers):\n return sum(numbers) / len(numbers)"}
],
temperature=0.3,
max_tokens=1000,
user_priority=["claude-3.5-sonnet", "gpt-4-turbo"],
latency_priority="quality",
max_cost_credits=50.0
)
print(f"Model used: {response['_orchestrator_metadata']['model_used']}")
print(f"Cost: {response['_orchestrator_metadata']['cost_credits']} credits")
print(f"Latency: {response['_orchestrator_metadata']['actual_latency_sec']}s")
print(f"Response: {response['choices'][0]['message']['content']}")
Эта реализация демонстрирует ключевые паттерны production-системы: scoring-based model selection, cost constraints, automatic failover с логированием попыток, расчёт фактической стоимости и latency. В реальной системе дополнительно требуется интеграция с monitoring, persistent storage для availability metrics и более сложная логика оценки токенов.
Мониторинг и метрики производительности
Для обеспечения SLA 99.7% uptime критически важен comprehensive monitoring на всех уровнях системы. Мы собираем метрики в Prometheus и визуализируем в Grafana с настроенными алертами в PagerDuty.
Ключевые метрики по категориям:
Availability Metrics:
- Uptime по каждой модели (rolling 1h, 24h, 7d windows)
- Rate limit hits per model per hour
- Failover rate (процент запросов, потребовавших failover)
- Success rate после всех retry попыток
Performance Metrics:
- P50, P95, P99 latency по каждой модели и типу запроса
- Time to first token (для streaming responses)
- Tokens per second throughput
- Queue depth в Orchestration Layer
Financial Metrics:
- Cost per 1k requests по моделям
- Revenue vs cost margin в реальном времени
- Credits consumption rate по клиентам
- Cache hit rate и savings from caching
Quality Metrics:
- Error rate по типам (4xx client errors, 5xx server errors, timeouts)
- Retry rate и average retries per request
- Idempotency key collision rate
- User satisfaction score (на основе feedback API)
Алерты настроены на следующие условия:
- Critical: любая модель с availability < 90% за последние 15 минут
- Critical: overall success rate < 95% за последние 5 минут
- Warning: P95 latency > 5 секунд для любой модели
- Warning: failover rate > 15% за последний час
- Info: cache hit rate < 20% (возможна оптимизация)
Distributed tracing реализован через OpenTelemetry с экспортом в Jaeger. Каждый запрос получает trace_id, который проходит через все компоненты системы, позволяя анализировать bottlenecks и debugging проблем производительности.
Выводы и дальнейшее развитие
Агрегация 25 AI-моделей через OpenRouter API позволила нам создать resilient и cost-effective систему для production workloads. Ключевые достижения:
- Uptime 99.7% благодаря intelligent failover между моделями
- Снижение затрат на 18% через prompt caching и оптимальный выбор моделей
- Средняя latency 1.8 секунды для text generation при обработке 2.3M запросов/месяц
- Маржинальность 15-20% при конкурентоспособном pricing
Архитектурные решения, которые оказались критичными:
- Трёхуровневая архитектура с чётким разделением ответственности между Gateway, Orchestration и Adapters
- Retry-механизм с exponential backoff и jitter для предотвращения thundering herd
- Idempotency keys для гарантии exactly-once семантики при сетевых проблемах
- Двухуровневое кеширование (exact match + semantic similarity) для оптимизации затрат
- Scoring-based model selection с учётом качества, стоимости и latency
Направления дальнейшего развития:
Расширение модельного ряда: добавление специализированных моделей для domain-specific задач (медицина, юриспруденция, финансы), интеграция video generation моделей следующего поколения, поддержка multimodal моделей с native image understanding.
Улучшение failover-логики: machine learning для предсказания вероятности успеха запроса на конкретной модели на основе исторических данных, adaptive retry delays на основе текущей загрузки провайдеров, circuit breaker pattern для временного исключения проблемных моделей.
Оптимизация затрат: dynamic batching для группировки похожих запросов, более агрессивное кеширование для enterprise-клиентов, negotiation прямых контрактов с провайдерами для volume discounts.
Улучшение developer experience: SDK для популярных языков (Python, JavaScript, Go, Java), webhooks для асинхронной обработки длительных запросов, playground для тестирования промптов с разными моделями.
Исходный код базовых компонентов системы доступен в нашем GitHub репозитории. Для production deployment рекомендуем дополнительно реализовать rate limiting на уровне пользователей, comprehensive logging с retention policies и backup стратегию для критичных данных.
Опыт эксплуатации системы показал, что агрегация AI-моделей через единый API — это не просто convenience feature, а необходимость для построения надёжных и экономически эффективных AI-powered приложений. Правильная архитектура с failover, retry и caching позволяет достичь enterprise-grade reliability при работе с inherently unreliable external APIs.
Готовы попробовать AvatarBox?
Создать первое видео бесплатно