Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Usage Recording Flow

Overview

The Usage Recording Flow is a comprehensive traffic monitoring and billing system in the Telecom Module that tracks user bandwidth consumption, applies billing multipliers, and processes package usage. This system ensures accurate billing while providing detailed analytics for both users and administrators.

The flow consists of three main phases:

  1. Data Collection: Node servers report raw traffic data
  2. Aggregation: Raw usage is collected, multiplied by traffic factors, and prepared for billing
  3. Billing: Aggregated usage is applied to user packages, potentially expiring them when limits are exceeded

System Architecture

┌─────────────────┐    ┌──────────────────────┐    ┌─────────────────────┐
│   Node Servers  │───▶│ Traffic Report APIs  │───▶│  Raw Usage Storage  │
└─────────────────┘    └──────────────────────┘    └─────────────────────┘
                                                              │
                                                              ▼
┌─────────────────────┐    ┌──────────────────┐    ┌──────────────────────┐
│ Package Expiry      │◀───│   Billing Hook   │◀───│ Cron Jobs - Traffic  │
│ Events              │    │                  │    │ Billing              │
└─────────────────────┘    └──────────────────┘    └──────────────────────┘
          ▲                          │                        │
          │                          ▼                        ▼
┌─────────────────────┐    ┌──────────────────┐    ┌──────────────────────┐
│ Package Expiration  │    │ Package Usage    │    │ Usage Aggregation    │
│ Check               │    │ Update           │    │                      │
└─────────────────────┘    └──────────────────┘    └──────────────────────┘
                                                              │
                                                              ▼
                                                    ┌──────────────────────┐
                                                    │   Message Queue      │
                                                    │ (UserUsageBilling    │
                                                    │  Event)              │
                                                    └──────────────────────┘

Data Flow:

  1. Node Servers → Report traffic via APIs (ReportTraffic, UploadUniProxyTraffic)
  2. Traffic Report APIs → Store raw usage in user_traffic_usage table
  3. Cron Jobs → Periodically gather unbilled usage and aggregate with traffic factors
  4. Usage Aggregation → Create UserUsageBillingEvent messages
  5. Message Queue → Distribute billing events to processing hooks
  6. Billing Hook → Apply usage to user packages and check limits
  7. Package Usage Update → Update package usage counters
  8. Package Expiration Check → Determine if package limits exceeded
  9. Package Expiry Events → Trigger package expiration workflows

Core Components

1. Data Collection Layer

Node Traffic Reporting APIs

  • ReportTraffic: Legacy SSP-compatible traffic reporting
  • UploadUniProxyTraffic: Modern UniProxy traffic reporting

Both APIs collect traffic reports from node servers and store them as raw usage records.

2. Data Storage

user_traffic_usage Table

Stores individual traffic records with the following key fields:

  • user_id: User UUID
  • upload/download: Raw traffic in bytes
  • node_client_id: Source node information
  • timestamp: When traffic occurred
  • has_been_billed: Billing status flag

3. Aggregation & Billing System

Components:

  • TelecomCronExecutor: Scheduled jobs for billing
  • GatherUnbilledUsage: Aggregates and marks unbilled traffic
  • UserUsageBillingEvent: Message queue events for billing
  • TelecomBillingHook: Processes billing events and updates packages

Detailed Flow Walkthrough

Phase 1: Data Collection

Node servers periodically report traffic usage to the system:

// Example: UniProxy traffic reporting
let report = UploadUniProxyTraffic {
    node_id: 1,
    records: vec![
        TrafficReportRecord {
            user_id: 12345, // number_id from token system
            upload: 1048576,   // 1MB uploaded
            download: 5242880, // 5MB downloaded
        }
    ]
};

// Process the report
let result = node_server_service.process(report).await?;

Key Processing Steps:

  1. Validation: Only records with total traffic > 10KB are processed
  2. User Resolution: The number_id is resolved to actual user_id via tokens
  3. Node Matching: System finds the appropriate node_client based on user’s active package
  4. Storage: Raw usage is stored in user_traffic_usage with has_been_billed = FALSE
// Internal process: InsertTrafficReportBatch
impl Processor<InsertTrafficReportBatch, Result<(), sqlx::Error>> for DatabaseProcessor {
    async fn process(&self, input: InsertTrafficReportBatch) -> Result<(), sqlx::Error> {
        // Complex SQL that resolves number_id -> user_id -> node_client
        // and inserts traffic records with proper node association
    }
}

Phase 2: Aggregation & Billing Preparation

The system runs periodic cron jobs (typically every few minutes) to process unbilled usage:

// Cron job execution
impl Processor<PrimitiveDateTime, Result<Box<[BillTrafficJob]>, Error>> for TelecomCronExecutor {
    async fn process(&self, _input: PrimitiveDateTime) -> Result<Box<[BillTrafficJob]>, Error> {
        // Gather all unbilled usage and create billing jobs
        let find_result = self
            .db
            .process(GatherUnbilledUsage)  // Critical aggregation step
            .await?
            .into_iter()
            .map(BillTrafficJob)
            .collect::<Box<[_]>>();
        Ok(find_result)
    }
}

Critical Aggregation Process (GatherUnbilledUsage):

-- This query does several important things:
WITH updated AS (
    UPDATE "telecom"."user_traffic_usage" AS u
    SET has_been_billed = TRUE  -- Mark as billed to prevent double-billing
    FROM "telecom"."node_client" AS nc
    WHERE u.node_client_id = nc.id AND u.has_been_billed = FALSE
    RETURNING
        nc.server_id,
        u.user_id,
        CEIL(u.download::numeric * nc.traffic_factor) AS billed_download,  -- Apply traffic multiplier
        CEIL(u.upload::numeric * nc.traffic_factor) AS billed_upload,
        u.timestamp
)
SELECT
    server_id,
    user_id,
    SUM(billed_download)::BIGINT AS billed_download,
    SUM(billed_upload)::BIGINT AS billed_upload,
    MAX(timestamp) AS time
FROM updated
GROUP BY server_id, user_id  -- Aggregate by server and user

Key Operations:

  1. Atomic Marking: Marks records as billed to prevent race conditions
  2. Traffic Factor Application: Applies node-specific billing multipliers (nc.traffic_factor)
  3. Aggregation: Groups usage by server and user for efficient processing
  4. Ceiling Function: Ensures no fractional billing (always rounds up)

Phase 3: Package Billing & Expiration

For each aggregated usage record, the system publishes a billing event:

// Create and send billing event
let event = UserUsageBillingEvent {
    server_id: item.server_id,
    user: item.user_id,
    billed_download: item.billed_download,
    billed_upload: item.billed_upload,
    time: start_time.assume_utc().unix_timestamp() as u64,
};
event.send(&mq).await?;  // Send to message queue

The TelecomBillingHook consumes these events and applies usage to user packages:

impl Processor<UserUsageBillingEvent, Result<(), Error>> for TelecomBillingHook {
    async fn process(&self, event: UserUsageBillingEvent) -> Result<(), Error> {
        // Apply usage to user's active package
        let Some(record) = self
            .db
            .process(RecordPackageUsage {
                user_id: event.user,
                upload: event.billed_upload,
                download: event.billed_download,
            })
            .await?
        else {
            error!(user_id = %event.user, "Cannot find package for user");
            return Err(Error::NotFound);
        };

        // Check if package exceeded limits
        if record.expired {
            // Send package expiration event
            let ev = PackageExpiringEvent {
                item_id: record.item_id,
                user_id: record.user_id,
                package_id: record.package_id,
                expired_at: event.time,
                reason: PackageExpiredReason::Usage,  // Expired due to traffic usage
            };
            ev.send(&self.mq).await?;
        }
        Ok(())
    }
}

Package Usage Recording (RecordPackageUsage):

This is the most critical operation that:

  1. Finds user’s currently active package
  2. Adds billed traffic to package usage counters
  3. Checks if total usage exceeds package limits
  4. Automatically transitions package status to ‘consumed’ if limits exceeded
-- Simplified version of the package usage update query
UPDATE "telecom"."package_queue" AS pq
SET upload = pq.upload + $2,
    download = pq.download + $3,
    status = CASE
        WHEN pq.upload + $2 + pq.download + $3 >= p.traffic_limit + pq.adjust_quota
            THEN 'consumed'::telecom.live_package_status
        ELSE 'active'::telecom.live_package_status
    END
FROM "telecom"."packages" AS p
WHERE pq.user_id = $1 AND pq.package_id = p.id

Developer Usage Guide

Adding New Traffic Sources

To add support for new node types or reporting formats:

  1. Create Traffic Report Structure:
#[derive(Debug, Clone, PartialEq)]
pub struct MyCustomTrafficReport {
    pub node_id: i32,
    pub usage_records: Vec<MyCustomRecord>,
}
  1. Implement Processor:
impl Processor<MyCustomTrafficReport, Result<ReportResult, Error>> for NodeServerService {
    async fn process(&self, input: MyCustomTrafficReport) -> Result<ReportResult, Error> {
        // Convert to standard TrafficReportRecord format
        let records: Vec<TrafficReportRecord> = input
            .usage_records
            .into_iter()
            .map(|r| TrafficReportRecord {
                number_id: r.user_number,
                upload: r.sent_bytes,
                download: r.received_bytes,
            })
            .filter(|r| r.download + r.upload > 10_000)  // Filter minimum threshold
            .collect();

        // Use existing batch insertion
        self.db.process(InsertTrafficReportBatch {
            server_id: input.node_id,
            timestamp: now(),
            records,
        }).await?;

        Ok(ReportResult::Ok)
    }
}

Querying Usage Data

Get User’s Recent Usage

// Get hourly usage for the last 24 hours
let usage_data = db.process(GetUserHourlyUsage {
    user: user_id,
    begin: now - Duration::hours(24),
    end: now,
}).await?;

for record in usage_data {
    println!("Hour: {}, Raw: {}MB, Billed: {}MB",
        record.time,
        (record.upload + record.download) / 1_048_576,
        (record.billed_upload + record.billed_download) / 1_048_576
    );
}

Monitor Unbilled Usage

// Check for pending billing (useful for monitoring)
let unbilled = db.process(GatherUnbilledUsage).await?;
println!("Found {} users with unbilled usage", unbilled.len());

Analytics & Reporting

Traffic Factor Impact Analysis

// Compare raw vs billed traffic to understand traffic factor impact
let usage = db.process(GetUserDailyUsage {
    user_id,
    begin: start_date,
    end: end_date,
}).await?;

for day in usage {
    let raw_total = day.upload + day.download;
    let billed_total = day.billed_upload + day.billed_download;
    let factor = billed_total as f64 / raw_total as f64;
    println!("Date: {}, Factor: {:.2}x", day.time, factor);
}

Important Developer Considerations

1. Race Conditions & Data Integrity

  • Billing Flag: The has_been_billed flag prevents double-billing during concurrent cron jobs
  • Atomic Updates: Use database transactions for critical operations
  • Event Ordering: Message queue ensures billing events are processed in order

2. Traffic Factor System

  • Node-Specific Multipliers: Each node_client has a traffic_factor (e.g., 1.0, 1.5, 2.0)
  • Ceiling Rounding: Always rounds up to prevent under-billing
  • Factor Changes: Changing factors only affects new traffic, not historical data

3. Performance Optimization

  • Minimum Threshold: Only records with >10KB total traffic are processed
  • Batch Processing: Traffic reports are processed in batches for efficiency
  • Index Usage: Ensure proper indexing on user_id, has_been_billed, and timestamp

4. Error Handling

// Always handle missing package scenarios
if let Some(record) = db.process(RecordPackageUsage { ... }).await? {
    // Process successful usage recording
} else {
    // User has no active package - this is expected for expired/inactive users
    warn!("User {} has no active package for billing", user_id);
}

5. Monitoring & Alerting

Key metrics to monitor:

  • Unbilled Records: Should trend toward zero between cron runs
  • Failed Billing Events: Errors in TelecomBillingHook processing
  • Package Expiration Rate: Monitor PackageExpiringEvent frequency
  • Traffic Factor Distribution: Ensure factors are applied correctly

6. Testing Considerations

When writing tests:

// Always test with realistic traffic factors
let test_factor = 1.5;
let raw_usage = 1_000_000; // 1MB
let expected_billed = (raw_usage as f64 * test_factor).ceil() as i64; // 1,500,000

// Test edge cases around package limits
let package_limit = 1_000_000_000; // 1GB
let usage_just_under = package_limit - 1;
let usage_just_over = package_limit + 1;

This usage recording system provides robust, scalable traffic monitoring with accurate billing that scales to handle high-traffic proxy networks while maintaining data integrity and providing detailed analytics.