Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Package Queue

Overview

The Package Queue is a core component of the Telecom module that manages user packages in a queue-based system. It ensures that users can have multiple packages but only one active package at a time, with automatic activation of the next package when the current one expires.

Concept

What is Package Queue?

The Package Queue is a system that manages the lifecycle of telecom packages for users. Think of it as a “playlist” for packages - users can have multiple packages in their queue, but only one plays (is active) at a time. When the current package expires or is consumed, the system automatically activates the next package in line.

Key Characteristics

  1. Single Active Package Rule: Each user can only have one active package at any given time
  2. FIFO Queue: Packages are activated in First-In-First-Out order based on created_at timestamp
  3. Automatic Activation: When an active package expires, the next queued package is automatically activated
  4. Traffic Tracking: Each package tracks upload/download usage with quota adjustments
  5. Event-Driven: Package lifecycle changes trigger events for other system components

Package States

pub enum LivePackageStatus {
    /// The package is in the queue, but not active
    InQueue,

    /// The package that the user is using
    Active,

    /// The package that has expired due to time or traffic limits
    Consumed,

    /// The package that was cancelled (e.g., refunded)
    Cancelled,
}

How it Works

Data Structure

The core data structure is PackageQueueItem:

pub struct PackageQueueItem {
    pub id: i64,
    pub user_id: Uuid,
    pub package_id: i64,              // Reference to package definition
    pub by_order: Option<Uuid>,       // Optional order ID that created this item
    pub status: LivePackageStatus,
    pub created_at: PrimitiveDateTime,
    pub activated_at: Option<PrimitiveDateTime>,

    // Traffic usage tracking
    pub upload: i64,                  // Billed upload traffic in bytes
    pub download: i64,                // Billed download traffic in bytes
    pub adjust_quota: i64,           // Quota adjustment (can be negative or positive)
}

Queue Processing Workflow

1. Package Creation

When packages are purchased, they are added to the queue with InQueue status:

// Single package
CreateQueueItem { user_id, package_id, by_order }

// Multiple identical packages
CreateQueueItems { user_id, package_id, by_order, amount }

2. Automatic Activation

The system automatically activates packages through the process_package_queue_push function:

pub async fn process_package_queue_push(
    transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    user_id: Uuid,
) -> Result<PackageQueuePushResult, sqlx::Error>

Activation Logic:

  1. Check if user has an active package
  2. If active package exists, do nothing
  3. If no active package, find the oldest queued package (ORDER BY created_at)
  4. Activate the found package by setting status = 'active' and activated_at = NOW()

3. Traffic Usage Recording

Traffic usage is recorded through the billing system:

pub struct RecordPackageUsage {
    pub user_id: Uuid,
    pub upload: i64,    // Additional upload traffic to bill
    pub download: i64,  // Additional download traffic to bill
}

Billing Logic:

  1. Find the user’s active package
  2. Add the new traffic to existing usage counters
  3. Check if total usage exceeds limit: (upload + download) >= (traffic_limit + adjust_quota)
  4. If limit exceeded, automatically set status to Consumed

4. Package Expiration

Packages can expire due to two reasons:

  • Time expiration: Handled by cron jobs that check expire_at timestamps
  • Usage expiration: Triggered automatically when traffic limits are exceeded during billing

When a package expires, a PackageExpiringEvent is published.

5. Queue Advancement

When a package expires, the system automatically activates the next package:

  1. PackageExpiringEvent is consumed by TelecomPackageQueueHook
  2. Expired package status is updated to Consumed
  3. System looks for the next InQueue package for the user
  4. If found, activates it and publishes PackageActivateEvent
  5. If no more packages, publishes AllPackageExpiredEvent

Concurrency Control

The system uses Redis-based distributed locks to prevent race conditions:

pub struct PackageQueueLock;

Lock Usage:

  • Lock ID: User ID (LockId(user_id))
  • TTL: 30 seconds default
  • Retry Logic: Up to 10 retries for lock acquisition
  • Operations Protected:
    • Package queue push processing
    • Package expiration handling
    • Package activation

Event System

The Package Queue publishes several events for system integration:

PackageQueuePushEvent

pub struct PackageQueuePushEvent {
    pub item_ids: Vec<i64>,
    pub user_id: Uuid,
    pub package_id: i64,
    pub pushed_at: u64,
}
  • Purpose: Internal event when packages are added to queue
  • Route: telecom.package_queuing

PackageActivateEvent

pub struct PackageActivateEvent {
    pub item_id: i64,
    pub user_id: Uuid,
    pub package_id: i64,
    pub activated_at: u64,
}
  • Purpose: Internal event when a package becomes active
  • Route: telecom.package_activate

PackageExpiringEvent

pub struct PackageExpiringEvent {
    pub item_id: i64,
    pub user_id: Uuid,
    pub package_id: i64,
    pub expired_at: u64,
    pub reason: PackageExpiredReason, // Time or Usage
}
  • Purpose: Internal event when a package expires
  • Route: telecom.package_expiring

Service Layer

The PackageQueueService provides high-level operations:

pub struct PackageQueueService {
    pub db: DatabaseProcessor,
    pub redis: RedisConnection,
}

Available Operations:

  • GetCurrentPackage: Get user’s active package info
  • GetAllMyPackages: List all packages for a user

Database Schema

The package queue is stored in the telecom.package_queue table with the following key indexes:

  • User-based queries: (user_id, status)
  • Queue ordering: (user_id, status, created_at)
  • Package lookup: (package_id)

Usage Examples

For Developers

Adding Packages to Queue

// Add single package
let item = db.process(CreateQueueItem {
    user_id: user.id,
    package_id: package.id,
    by_order: Some(order.id),
}).await?;

// Add multiple identical packages
let items = db.process(CreateQueueItems {
    user_id: user.id,
    package_id: package.id,
    by_order: Some(order.id),
    amount: 5,
}).await?;

Getting Active Package

let service = PackageQueueService { db, redis };
let active_package = service.process(GetCurrentPackage {
    user_id: user.id,
}).await?;

Recording Traffic Usage

// This is typically done by the billing system
let record = db.process(RecordPackageUsage {
    user_id: user.id,
    upload: 1024 * 1024,    // 1MB upload
    download: 10 * 1024 * 1024,  // 10MB download
}).await?;

if record.map(|r| r.expired).unwrap_or(false) {
    // Package expired due to usage, will trigger queue advancement
}

Integration Points

  1. Shop Module: Creates queue items when users purchase packages
  2. Billing System: Records traffic usage and triggers expiration
  3. Node Management: Checks active packages for user access control
  4. Admin Interface: Views and manages user package queues

Best Practices

  1. Always use transactions when modifying package queue state
  2. Handle lock acquisition failures gracefully with retries
  3. Listen to package events for system integration
  4. Consider quota adjustments when calculating available traffic
  5. Test concurrent scenarios due to multi-user nature of the system

Common Pitfalls

  1. Race Conditions: Always use Redis locks when modifying queue state
  2. Transaction Boundaries: Ensure event publishing happens after database commits
  3. Zero-Duration Packages: Handle edge cases where packages expire immediately
  4. Quota Calculations: Remember that adjust_quota can be negative
  5. Event Ordering: Package events may arrive out of order in distributed systems