Package Queue
Overview
The Package Queue is a core component of the Telecom module that manages user packages in a queue-based system. It ensures that users can have multiple packages but only one active package at a time, with automatic activation of the next package when the current one expires.
Concept
What is Package Queue?
The Package Queue is a system that manages the lifecycle of telecom packages for users. Think of it as a “playlist” for packages - users can have multiple packages in their queue, but only one plays (is active) at a time. When the current package expires or is consumed, the system automatically activates the next package in line.
Key Characteristics
- Single Active Package Rule: Each user can only have one active package at any given time
- FIFO Queue: Packages are activated in First-In-First-Out order based on
created_attimestamp - Automatic Activation: When an active package expires, the next queued package is automatically activated
- Traffic Tracking: Each package tracks upload/download usage with quota adjustments
- Event-Driven: Package lifecycle changes trigger events for other system components
Package States
pub enum LivePackageStatus {
/// The package is in the queue, but not active
InQueue,
/// The package that the user is using
Active,
/// The package that has expired due to time or traffic limits
Consumed,
/// The package that was cancelled (e.g., refunded)
Cancelled,
}
How it Works
Data Structure
The core data structure is PackageQueueItem:
pub struct PackageQueueItem {
pub id: i64,
pub user_id: Uuid,
pub package_id: i64, // Reference to package definition
pub by_order: Option<Uuid>, // Optional order ID that created this item
pub status: LivePackageStatus,
pub created_at: PrimitiveDateTime,
pub activated_at: Option<PrimitiveDateTime>,
// Traffic usage tracking
pub upload: i64, // Billed upload traffic in bytes
pub download: i64, // Billed download traffic in bytes
pub adjust_quota: i64, // Quota adjustment (can be negative or positive)
}
Queue Processing Workflow
1. Package Creation
When packages are purchased, they are added to the queue with InQueue status:
// Single package
CreateQueueItem { user_id, package_id, by_order }
// Multiple identical packages
CreateQueueItems { user_id, package_id, by_order, amount }
2. Automatic Activation
The system automatically activates packages through the process_package_queue_push function:
pub async fn process_package_queue_push(
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
user_id: Uuid,
) -> Result<PackageQueuePushResult, sqlx::Error>
Activation Logic:
- Check if user has an active package
- If active package exists, do nothing
- If no active package, find the oldest queued package (
ORDER BY created_at) - Activate the found package by setting
status = 'active'andactivated_at = NOW()
3. Traffic Usage Recording
Traffic usage is recorded through the billing system:
pub struct RecordPackageUsage {
pub user_id: Uuid,
pub upload: i64, // Additional upload traffic to bill
pub download: i64, // Additional download traffic to bill
}
Billing Logic:
- Find the user’s active package
- Add the new traffic to existing usage counters
- Check if total usage exceeds limit:
(upload + download) >= (traffic_limit + adjust_quota) - If limit exceeded, automatically set status to
Consumed
4. Package Expiration
Packages can expire due to two reasons:
- Time expiration: Handled by cron jobs that check
expire_attimestamps - Usage expiration: Triggered automatically when traffic limits are exceeded during billing
When a package expires, a PackageExpiringEvent is published.
5. Queue Advancement
When a package expires, the system automatically activates the next package:
PackageExpiringEventis consumed byTelecomPackageQueueHook- Expired package status is updated to
Consumed - System looks for the next
InQueuepackage for the user - If found, activates it and publishes
PackageActivateEvent - If no more packages, publishes
AllPackageExpiredEvent
Concurrency Control
The system uses Redis-based distributed locks to prevent race conditions:
pub struct PackageQueueLock;
Lock Usage:
- Lock ID: User ID (
LockId(user_id)) - TTL: 30 seconds default
- Retry Logic: Up to 10 retries for lock acquisition
- Operations Protected:
- Package queue push processing
- Package expiration handling
- Package activation
Event System
The Package Queue publishes several events for system integration:
PackageQueuePushEvent
pub struct PackageQueuePushEvent {
pub item_ids: Vec<i64>,
pub user_id: Uuid,
pub package_id: i64,
pub pushed_at: u64,
}
- Purpose: Internal event when packages are added to queue
- Route:
telecom.package_queuing
PackageActivateEvent
pub struct PackageActivateEvent {
pub item_id: i64,
pub user_id: Uuid,
pub package_id: i64,
pub activated_at: u64,
}
- Purpose: Internal event when a package becomes active
- Route:
telecom.package_activate
PackageExpiringEvent
pub struct PackageExpiringEvent {
pub item_id: i64,
pub user_id: Uuid,
pub package_id: i64,
pub expired_at: u64,
pub reason: PackageExpiredReason, // Time or Usage
}
- Purpose: Internal event when a package expires
- Route:
telecom.package_expiring
Service Layer
The PackageQueueService provides high-level operations:
pub struct PackageQueueService {
pub db: DatabaseProcessor,
pub redis: RedisConnection,
}
Available Operations:
GetCurrentPackage: Get user’s active package infoGetAllMyPackages: List all packages for a user
Database Schema
The package queue is stored in the telecom.package_queue table with the following key indexes:
- User-based queries:
(user_id, status) - Queue ordering:
(user_id, status, created_at) - Package lookup:
(package_id)
Usage Examples
For Developers
Adding Packages to Queue
// Add single package
let item = db.process(CreateQueueItem {
user_id: user.id,
package_id: package.id,
by_order: Some(order.id),
}).await?;
// Add multiple identical packages
let items = db.process(CreateQueueItems {
user_id: user.id,
package_id: package.id,
by_order: Some(order.id),
amount: 5,
}).await?;
Getting Active Package
let service = PackageQueueService { db, redis };
let active_package = service.process(GetCurrentPackage {
user_id: user.id,
}).await?;
Recording Traffic Usage
// This is typically done by the billing system
let record = db.process(RecordPackageUsage {
user_id: user.id,
upload: 1024 * 1024, // 1MB upload
download: 10 * 1024 * 1024, // 10MB download
}).await?;
if record.map(|r| r.expired).unwrap_or(false) {
// Package expired due to usage, will trigger queue advancement
}
Integration Points
- Shop Module: Creates queue items when users purchase packages
- Billing System: Records traffic usage and triggers expiration
- Node Management: Checks active packages for user access control
- Admin Interface: Views and manages user package queues
Best Practices
- Always use transactions when modifying package queue state
- Handle lock acquisition failures gracefully with retries
- Listen to package events for system integration
- Consider quota adjustments when calculating available traffic
- Test concurrent scenarios due to multi-user nature of the system
Common Pitfalls
- Race Conditions: Always use Redis locks when modifying queue state
- Transaction Boundaries: Ensure event publishing happens after database commits
- Zero-Duration Packages: Handle edge cases where packages expire immediately
- Quota Calculations: Remember that
adjust_quotacan be negative - Event Ordering: Package events may arrive out of order in distributed systems