Usage Recording Flow
Overview
The Usage Recording Flow is a comprehensive traffic monitoring and billing system in the Telecom Module that tracks user bandwidth consumption, applies billing multipliers, and processes package usage. This system ensures accurate billing while providing detailed analytics for both users and administrators.
The flow consists of three main phases:
- Data Collection: Node servers report raw traffic data
- Aggregation: Raw usage is collected, multiplied by traffic factors, and prepared for billing
- Billing: Aggregated usage is applied to user packages, potentially expiring them when limits are exceeded
System Architecture
┌─────────────────┐ ┌──────────────────────┐ ┌─────────────────────┐
│ Node Servers │───▶│ Traffic Report APIs │───▶│ Raw Usage Storage │
└─────────────────┘ └──────────────────────┘ └─────────────────────┘
│
▼
┌─────────────────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ Package Expiry │◀───│ Billing Hook │◀───│ Cron Jobs - Traffic │
│ Events │ │ │ │ Billing │
└─────────────────────┘ └──────────────────┘ └──────────────────────┘
▲ │ │
│ ▼ ▼
┌─────────────────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ Package Expiration │ │ Package Usage │ │ Usage Aggregation │
│ Check │ │ Update │ │ │
└─────────────────────┘ └──────────────────┘ └──────────────────────┘
│
▼
┌──────────────────────┐
│ Message Queue │
│ (UserUsageBilling │
│ Event) │
└──────────────────────┘
Data Flow:
- Node Servers → Report traffic via APIs (
ReportTraffic,UploadUniProxyTraffic) - Traffic Report APIs → Store raw usage in
user_traffic_usagetable - Cron Jobs → Periodically gather unbilled usage and aggregate with traffic factors
- Usage Aggregation → Create
UserUsageBillingEventmessages - Message Queue → Distribute billing events to processing hooks
- Billing Hook → Apply usage to user packages and check limits
- Package Usage Update → Update package usage counters
- Package Expiration Check → Determine if package limits exceeded
- Package Expiry Events → Trigger package expiration workflows
Core Components
1. Data Collection Layer
Node Traffic Reporting APIs
- ReportTraffic: Legacy SSP-compatible traffic reporting
- UploadUniProxyTraffic: Modern UniProxy traffic reporting
Both APIs collect traffic reports from node servers and store them as raw usage records.
2. Data Storage
user_traffic_usage Table
Stores individual traffic records with the following key fields:
user_id: User UUIDupload/download: Raw traffic in bytesnode_client_id: Source node informationtimestamp: When traffic occurredhas_been_billed: Billing status flag
3. Aggregation & Billing System
Components:
- TelecomCronExecutor: Scheduled jobs for billing
- GatherUnbilledUsage: Aggregates and marks unbilled traffic
- UserUsageBillingEvent: Message queue events for billing
- TelecomBillingHook: Processes billing events and updates packages
Detailed Flow Walkthrough
Phase 1: Data Collection
Node servers periodically report traffic usage to the system:
// Example: UniProxy traffic reporting
let report = UploadUniProxyTraffic {
node_id: 1,
records: vec![
TrafficReportRecord {
user_id: 12345, // number_id from token system
upload: 1048576, // 1MB uploaded
download: 5242880, // 5MB downloaded
}
]
};
// Process the report
let result = node_server_service.process(report).await?;
Key Processing Steps:
- Validation: Only records with total traffic > 10KB are processed
- User Resolution: The
number_idis resolved to actualuser_idvia tokens - Node Matching: System finds the appropriate
node_clientbased on user’s active package - Storage: Raw usage is stored in
user_traffic_usagewithhas_been_billed = FALSE
// Internal process: InsertTrafficReportBatch
impl Processor<InsertTrafficReportBatch, Result<(), sqlx::Error>> for DatabaseProcessor {
async fn process(&self, input: InsertTrafficReportBatch) -> Result<(), sqlx::Error> {
// Complex SQL that resolves number_id -> user_id -> node_client
// and inserts traffic records with proper node association
}
}
Phase 2: Aggregation & Billing Preparation
The system runs periodic cron jobs (typically every few minutes) to process unbilled usage:
// Cron job execution
impl Processor<PrimitiveDateTime, Result<Box<[BillTrafficJob]>, Error>> for TelecomCronExecutor {
async fn process(&self, _input: PrimitiveDateTime) -> Result<Box<[BillTrafficJob]>, Error> {
// Gather all unbilled usage and create billing jobs
let find_result = self
.db
.process(GatherUnbilledUsage) // Critical aggregation step
.await?
.into_iter()
.map(BillTrafficJob)
.collect::<Box<[_]>>();
Ok(find_result)
}
}
Critical Aggregation Process (GatherUnbilledUsage):
-- This query does several important things:
WITH updated AS (
UPDATE "telecom"."user_traffic_usage" AS u
SET has_been_billed = TRUE -- Mark as billed to prevent double-billing
FROM "telecom"."node_client" AS nc
WHERE u.node_client_id = nc.id AND u.has_been_billed = FALSE
RETURNING
nc.server_id,
u.user_id,
CEIL(u.download::numeric * nc.traffic_factor) AS billed_download, -- Apply traffic multiplier
CEIL(u.upload::numeric * nc.traffic_factor) AS billed_upload,
u.timestamp
)
SELECT
server_id,
user_id,
SUM(billed_download)::BIGINT AS billed_download,
SUM(billed_upload)::BIGINT AS billed_upload,
MAX(timestamp) AS time
FROM updated
GROUP BY server_id, user_id -- Aggregate by server and user
Key Operations:
- Atomic Marking: Marks records as billed to prevent race conditions
- Traffic Factor Application: Applies node-specific billing multipliers (
nc.traffic_factor) - Aggregation: Groups usage by server and user for efficient processing
- Ceiling Function: Ensures no fractional billing (always rounds up)
Phase 3: Package Billing & Expiration
For each aggregated usage record, the system publishes a billing event:
// Create and send billing event
let event = UserUsageBillingEvent {
server_id: item.server_id,
user: item.user_id,
billed_download: item.billed_download,
billed_upload: item.billed_upload,
time: start_time.assume_utc().unix_timestamp() as u64,
};
event.send(&mq).await?; // Send to message queue
The TelecomBillingHook consumes these events and applies usage to user packages:
impl Processor<UserUsageBillingEvent, Result<(), Error>> for TelecomBillingHook {
async fn process(&self, event: UserUsageBillingEvent) -> Result<(), Error> {
// Apply usage to user's active package
let Some(record) = self
.db
.process(RecordPackageUsage {
user_id: event.user,
upload: event.billed_upload,
download: event.billed_download,
})
.await?
else {
error!(user_id = %event.user, "Cannot find package for user");
return Err(Error::NotFound);
};
// Check if package exceeded limits
if record.expired {
// Send package expiration event
let ev = PackageExpiringEvent {
item_id: record.item_id,
user_id: record.user_id,
package_id: record.package_id,
expired_at: event.time,
reason: PackageExpiredReason::Usage, // Expired due to traffic usage
};
ev.send(&self.mq).await?;
}
Ok(())
}
}
Package Usage Recording (RecordPackageUsage):
This is the most critical operation that:
- Finds user’s currently active package
- Adds billed traffic to package usage counters
- Checks if total usage exceeds package limits
- Automatically transitions package status to ‘consumed’ if limits exceeded
-- Simplified version of the package usage update query
UPDATE "telecom"."package_queue" AS pq
SET upload = pq.upload + $2,
download = pq.download + $3,
status = CASE
WHEN pq.upload + $2 + pq.download + $3 >= p.traffic_limit + pq.adjust_quota
THEN 'consumed'::telecom.live_package_status
ELSE 'active'::telecom.live_package_status
END
FROM "telecom"."packages" AS p
WHERE pq.user_id = $1 AND pq.package_id = p.id
Developer Usage Guide
Adding New Traffic Sources
To add support for new node types or reporting formats:
- Create Traffic Report Structure:
#[derive(Debug, Clone, PartialEq)]
pub struct MyCustomTrafficReport {
pub node_id: i32,
pub usage_records: Vec<MyCustomRecord>,
}
- Implement Processor:
impl Processor<MyCustomTrafficReport, Result<ReportResult, Error>> for NodeServerService {
async fn process(&self, input: MyCustomTrafficReport) -> Result<ReportResult, Error> {
// Convert to standard TrafficReportRecord format
let records: Vec<TrafficReportRecord> = input
.usage_records
.into_iter()
.map(|r| TrafficReportRecord {
number_id: r.user_number,
upload: r.sent_bytes,
download: r.received_bytes,
})
.filter(|r| r.download + r.upload > 10_000) // Filter minimum threshold
.collect();
// Use existing batch insertion
self.db.process(InsertTrafficReportBatch {
server_id: input.node_id,
timestamp: now(),
records,
}).await?;
Ok(ReportResult::Ok)
}
}
Querying Usage Data
Get User’s Recent Usage
// Get hourly usage for the last 24 hours
let usage_data = db.process(GetUserHourlyUsage {
user: user_id,
begin: now - Duration::hours(24),
end: now,
}).await?;
for record in usage_data {
println!("Hour: {}, Raw: {}MB, Billed: {}MB",
record.time,
(record.upload + record.download) / 1_048_576,
(record.billed_upload + record.billed_download) / 1_048_576
);
}
Monitor Unbilled Usage
// Check for pending billing (useful for monitoring)
let unbilled = db.process(GatherUnbilledUsage).await?;
println!("Found {} users with unbilled usage", unbilled.len());
Analytics & Reporting
Traffic Factor Impact Analysis
// Compare raw vs billed traffic to understand traffic factor impact
let usage = db.process(GetUserDailyUsage {
user_id,
begin: start_date,
end: end_date,
}).await?;
for day in usage {
let raw_total = day.upload + day.download;
let billed_total = day.billed_upload + day.billed_download;
let factor = billed_total as f64 / raw_total as f64;
println!("Date: {}, Factor: {:.2}x", day.time, factor);
}
Important Developer Considerations
1. Race Conditions & Data Integrity
- Billing Flag: The
has_been_billedflag prevents double-billing during concurrent cron jobs - Atomic Updates: Use database transactions for critical operations
- Event Ordering: Message queue ensures billing events are processed in order
2. Traffic Factor System
- Node-Specific Multipliers: Each
node_clienthas atraffic_factor(e.g., 1.0, 1.5, 2.0) - Ceiling Rounding: Always rounds up to prevent under-billing
- Factor Changes: Changing factors only affects new traffic, not historical data
3. Performance Optimization
- Minimum Threshold: Only records with >10KB total traffic are processed
- Batch Processing: Traffic reports are processed in batches for efficiency
- Index Usage: Ensure proper indexing on
user_id,has_been_billed, andtimestamp
4. Error Handling
// Always handle missing package scenarios
if let Some(record) = db.process(RecordPackageUsage { ... }).await? {
// Process successful usage recording
} else {
// User has no active package - this is expected for expired/inactive users
warn!("User {} has no active package for billing", user_id);
}
5. Monitoring & Alerting
Key metrics to monitor:
- Unbilled Records: Should trend toward zero between cron runs
- Failed Billing Events: Errors in TelecomBillingHook processing
- Package Expiration Rate: Monitor PackageExpiringEvent frequency
- Traffic Factor Distribution: Ensure factors are applied correctly
6. Testing Considerations
When writing tests:
// Always test with realistic traffic factors
let test_factor = 1.5;
let raw_usage = 1_000_000; // 1MB
let expected_billed = (raw_usage as f64 * test_factor).ceil() as i64; // 1,500,000
// Test edge cases around package limits
let package_limit = 1_000_000_000; // 1GB
let usage_just_under = package_limit - 1;
let usage_just_over = package_limit + 1;
This usage recording system provides robust, scalable traffic monitoring with accurate billing that scales to handle high-traffic proxy networks while maintaining data integrity and providing detailed analytics.