Systems Library / AI Model Setup / How to Create AI Usage Analytics and Reporting
AI Model Setup routing optimization

How to Create AI Usage Analytics and Reporting

Build analytics to understand which teams, tasks, and models drive your AI costs.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

After three months of running AI across five different client workflows, I had no idea which team was using what, which task types were the heaviest consumers, or which model choices were actually saving money versus wasting it. I had costs but no insight. Building ai api usage analytics reporting gave me the visibility to cut 28% of spend in a single week by identifying two workflows that were using GPT-4o for tasks that Haiku handled just as well.

The difference between a cost dashboard and usage analytics is causality. A dashboard tells you how much you spent. Analytics tells you why, and more importantly, what to do about it.

What You Need Before Starting

Step 1: Extend Your Usage Schema

If you built the cost dashboard already, extend that table. If starting fresh, use this schema.

import sqlite3

DB_PATH = "ai_analytics.db"

def init_db():
    conn = sqlite3.connect(DB_PATH)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS ai_usage (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            ts              TEXT NOT NULL,
            provider        TEXT NOT NULL,
            model           TEXT NOT NULL,
            team            TEXT NOT NULL,
            task_type       TEXT NOT NULL,
            workflow        TEXT,
            input_tokens    INTEGER DEFAULT 0,
            output_tokens   INTEGER DEFAULT 0,
            cost_usd        REAL NOT NULL,
            latency_ms      INTEGER,
            quality_score   REAL,
            passed_quality  INTEGER,
            request_id      TEXT,
            session_id      TEXT
        );
        CREATE INDEX IF NOT EXISTS idx_ts       ON ai_usage(ts);
        CREATE INDEX IF NOT EXISTS idx_team     ON ai_usage(team);
        CREATE INDEX IF NOT EXISTS idx_workflow ON ai_usage(workflow);
        CREATE INDEX IF NOT EXISTS idx_model    ON ai_usage(model);
    """)
    conn.commit()
    conn.close()

The additions over a basic cost table: workflow (named pipeline), quality_score and passed_quality (from quality scoring), and session_id (groups a user or job session together).

Step 2: Build the Core Analytics Queries

These four queries answer the most important operational questions.

def query(sql: str, params: tuple = ()) -> list[dict]:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    rows = conn.execute(sql, params).fetchall()
    conn.close()
    return [dict(r) for r in rows]

def cost_by_team(days: int = 30) -> list[dict]:
    return query("""
        SELECT team,
               SUM(cost_usd) as total_cost,
               COUNT(*) as requests,
               SUM(input_tokens + output_tokens) as tokens,
               AVG(quality_score) as avg_quality
        FROM ai_usage
        WHERE ts >= datetime('now', ?)
        GROUP BY team
        ORDER BY total_cost DESC
    """, (f'-{days} days',))

def cost_by_workflow(days: int = 30) -> list[dict]:
    return query("""
        SELECT workflow, task_type, model,
               SUM(cost_usd) as total_cost,
               COUNT(*) as requests,
               AVG(cost_usd) as avg_cost_per_call,
               AVG(latency_ms) as avg_latency,
               AVG(quality_score) as avg_quality,
               SUM(CASE WHEN passed_quality THEN 1 ELSE 0 END) * 1.0 / COUNT(*) as pass_rate
        FROM ai_usage
        WHERE ts >= datetime('now', ?)
        GROUP BY workflow, task_type, model
        ORDER BY total_cost DESC
    """, (f'-{days} days',))

def model_efficiency(days: int = 30) -> list[dict]:
    return query("""
        SELECT model,
               SUM(cost_usd) as total_cost,
               AVG(cost_usd) as avg_cost,
               AVG(latency_ms) as avg_latency,
               AVG(quality_score) as avg_quality,
               COUNT(*) as requests
        FROM ai_usage
        WHERE ts >= datetime('now', ?) AND quality_score IS NOT NULL
        GROUP BY model
        ORDER BY avg_quality DESC, avg_cost
    """, (f'-{days} days',))

def daily_trend(days: int = 30) -> list[dict]:
    return query("""
        SELECT DATE(ts) as day,
               SUM(cost_usd) as cost,
               COUNT(*) as requests,
               COUNT(DISTINCT team) as active_teams
        FROM ai_usage
        WHERE ts >= datetime('now', ?)
        GROUP BY DATE(ts)
        ORDER BY day
    """, (f'-{days} days',))

Step 3: Build Model Substitution Analysis

This is the analysis that finds the money. It identifies which tasks are using expensive models when a cheaper model would perform equally well.

def model_substitution_opportunities() -> list[dict]:
    """Find workflows where expensive models have similar quality to cheaper ones."""
    results = query("""
        SELECT workflow, task_type,
               model,
               AVG(quality_score) as avg_quality,
               AVG(cost_usd) as avg_cost,
               COUNT(*) as sample_size
        FROM ai_usage
        WHERE quality_score IS NOT NULL
          AND sample_size >= 20
        GROUP BY workflow, task_type, model
        HAVING sample_size >= 20
        ORDER BY workflow, task_type, avg_cost DESC
    """)
    
    # Group by workflow+task and find pairs where quality is within 5% but cost differs by 20%+
    opportunities = []
    from itertools import groupby
    
    key_fn = lambda r: (r["workflow"], r["task_type"])
    for key, group_rows in groupby(sorted(results, key=key_fn), key=key_fn):
        rows = list(group_rows)
        if len(rows) < 2:
            continue
        
        most_expensive = max(rows, key=lambda x: x["avg_cost"])
        cheapest = min(rows, key=lambda x: x["avg_cost"])
        
        if most_expensive["model"] == cheapest["model"]:
            continue
        
        quality_diff = abs((most_expensive["avg_quality"] or 0) - (cheapest["avg_quality"] or 0))
        cost_ratio = most_expensive["avg_cost"] / max(cheapest["avg_cost"], 0.0001)
        
        if quality_diff < 0.05 and cost_ratio > 1.2:
            monthly_savings = (most_expensive["avg_cost"] - cheapest["avg_cost"]) * most_expensive["sample_size"] * (30 / 7)
            opportunities.append({
                "workflow": key[0], "task": key[1],
                "current_model": most_expensive["model"],
                "better_model": cheapest["model"],
                "quality_diff": round(quality_diff, 3),
                "cost_ratio": round(cost_ratio, 2),
                "est_monthly_savings": round(monthly_savings, 4)
            })
    
    return sorted(opportunities, key=lambda x: x["est_monthly_savings"], reverse=True)

Step 4: Build the Weekly Report Generator

A report that summarizes the week's usage and flags anything worth acting on.

from datetime import datetime

def generate_weekly_report() -> str:
    lines = [
        f"# AI Usage Report — Week of {datetime.utcnow().strftime('%Y-%m-%d')}",
        "",
        "## Cost by Team (Last 7 Days)",
    ]
    
    for row in cost_by_team(7):
        lines.append(f"- {row['team']}: ${row['total_cost']:.4f} ({row['requests']} requests)")
    
    lines += ["", "## Top Workflows by Cost", ]
    for row in cost_by_workflow(7)[:5]:
        lines.append(f"- {row['workflow']} / {row['task_type']} [{row['model']}]: "
                     f"${row['total_cost']:.4f}, pass rate {row['pass_rate']*100:.0f}%")
    
    lines += ["", "## Model Efficiency", ]
    for row in model_efficiency(7):
        lines.append(f"- {row['model']}: avg quality {row['avg_quality']:.2f}, "
                     f"avg cost ${row['avg_cost']:.6f}, {row['requests']} calls")
    
    opps = model_substitution_opportunities()
    if opps:
        lines += ["", "## Cost Reduction Opportunities", ]
        for opp in opps[:3]:
            lines.append(f"- {opp['workflow']}/{opp['task']}: switch {opp['current_model']} "
                         f"to {opp['better_model']}, est. savings ${opp['est_monthly_savings']:.2f}/mo "
                         f"(quality diff: {opp['quality_diff']})")
    
    return "\n".join(lines)

Step 5: Send the Weekly Report to Slack

Automate the report delivery so you never have to pull it manually.

import requests, os

def send_slack_report():
    report = generate_weekly_report()
    webhook = os.getenv("SLACK_WEBHOOK_URL")
    if webhook:
        requests.post(webhook, json={"text": f"```{report}```"}, timeout=10)
        print("Report sent to Slack")
    else:
        print(report)

# crontab: 0 9 * * 1 python /scripts/ai_weekly_report.py
if __name__ == "__main__":
    send_slack_report()

Step 6: Build a Team Self-Service API

Give teams a simple API to query their own usage so they don't have to ask you.

from flask import Flask, jsonify, request as flask_request

app = Flask(__name__)

@app.route("/usage/team/<team_name>")
def team_usage(team_name: str):
    days = int(flask_request.args.get("days", 30))
    rows = query("""
        SELECT DATE(ts) as day, SUM(cost_usd) as cost, COUNT(*) as requests
        FROM ai_usage WHERE team = ? AND ts >= datetime('now', ?)
        GROUP BY DATE(ts) ORDER BY day
    """, (team_name, f'-{days} days'))
    total = sum(r["cost"] for r in rows)
    return jsonify({"team": team_name, "days": days, "total_cost": total, "daily": rows})

@app.route("/usage/report")
def weekly():
    return jsonify({"report": generate_weekly_report()})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5051)

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems