GitHunt
BY

bysiber/flashq

⚡ The task queue that works out of the box — no Redis, no RabbitMQ, just pip install and go.

⚡ FlashQ

The task queue that works out of the box — no Redis, no RabbitMQ, just pip install flashq and go.

CI
Python
License: MIT
Tests


Why FlashQ?

Every Python developer has been there: you need background tasks, you look at Celery, and suddenly you need Redis or RabbitMQ running, a separate broker config, and 200 lines of boilerplate before your first task runs.

FlashQ changes that. SQLite is the default backend — zero external dependencies, zero config. Your tasks persist across restarts, and you can scale to PostgreSQL or Redis when you need to.

from flashq import FlashQ

app = FlashQ()  # That's it. Uses SQLite by default.

@app.task()
def send_email(to: str, subject: str) -> None:
    print(f"Sending email to {to}: {subject}")

# Enqueue a task
send_email.delay(to="user@example.com", subject="Welcome!")

Features

Feature FlashQ Celery Dramatiq Huey TaskIQ
Zero-config setup ⚠️
SQLite backend
PostgreSQL backend ⚠️
Redis backend
Async + Sync tasks Async only
Type-safe .delay() ⚠️
Task chains/groups
Middleware system
Rate limiting
Web dashboard ⚠️
Dead letter queue
Task timeouts
Periodic/cron scheduler ⚠️ ⚠️
Zero dependencies

Installation

# Core (SQLite only — zero dependencies!)
pip install flashq

# With Redis
pip install "flashq[redis]"

# With PostgreSQL
pip install "flashq[postgres]"

# Development
pip install "flashq[dev]"

Try It Now!

See FlashQ in action with the built-in demo — zero config needed:

pip install flashq
python -m flashq.demo

You'll see 20 tasks being created, enqueued, processed by workers, and results printed in real-time.

Quick Start

1. Define tasks

# tasks.py
from flashq import FlashQ

app = FlashQ()

@app.task()
def add(x: int, y: int) -> int:
    return x + y

@app.task(queue="emails", max_retries=5, retry_delay=30.0)
def send_email(to: str, subject: str, body: str) -> dict:
    return {"status": "sent", "to": to}

@app.task(timeout=120.0)  # Kill if takes >2 min
async def process_image(url: str) -> str:
    # Async tasks just work™
    result = await download_and_resize(url)
    return result

2. Enqueue tasks

from tasks import add, send_email

# Simple dispatch
handle = add.delay(2, 3)

# With options
handle = send_email.apply(
    kwargs={"to": "user@example.com", "subject": "Hi", "body": "Hello!"},
    countdown=60,  # delay by 60 seconds
)

# Check result
result = handle.get_result()
if result and result.is_success:
    print(f"Result: {result.result}")

3. Start the worker

flashq worker tasks:app
 ⚡ FlashQ Worker
 ├─ name:        worker-12345
 ├─ backend:     SQLiteBackend
 ├─ queues:      default
 ├─ concurrency: 4
 ├─ tasks:       3
 │    └─ tasks.add
 │    └─ tasks.send_email
 │    └─ tasks.process_image
 └─ Ready! Waiting for tasks...

Task Composition

Chain tasks sequentially, run them in parallel, or combine both:

from flashq import chain, group, chord

# Chain: sequential — result of each passed to next
pipe = chain(
    download.s("https://example.com/data.csv"),
    parse_csv.s(),
    store_results.s(table="imports"),
)
pipe.delay(app)

# Group: parallel execution
batch = group(
    send_email.s(to="alice@test.com", subject="Hi"),
    send_email.s(to="bob@test.com", subject="Hi"),
    send_email.s(to="carol@test.com", subject="Hi"),
)
handle = batch.delay(app)
results = handle.get_results(timeout=30)

# Chord: parallel + callback when all complete
workflow = chord(
    group(fetch_price.s("AAPL"), fetch_price.s("GOOG"), fetch_price.s("MSFT")),
    aggregate_prices.s(),
)
workflow.delay(app)

Middleware

Intercept task lifecycle events for logging, monitoring, or custom logic:

from flashq import FlashQ, Middleware

class MetricsMiddleware(Middleware):
    def before_execute(self, message):
        self.start = time.time()
        return message

    def after_execute(self, message, result):
        duration = time.time() - self.start
        statsd.timing(f"task.{message.task_name}.duration", duration)

    def on_error(self, message, exc):
        sentry.capture_exception(exc)
        return False  # Don't suppress

    def on_dead(self, message, exc):
        alert_ops_team(f"Task {message.task_name} permanently failed: {exc}")

app = FlashQ()
app.add_middleware(MetricsMiddleware())

Built-in middlewares: LoggingMiddleware, TimeoutMiddleware, RateLimiter.

Rate Limiting

from flashq.ratelimit import RateLimiter

limiter = RateLimiter(default_rate="100/m")  # 100 tasks/minute global
limiter.configure("send_email", rate="10/m")  # 10 emails/minute
limiter.configure("api_call", rate="60/h")    # 60 API calls/hour

app.add_middleware(limiter)

Web Dashboard

Built-in monitoring UI — no extra services needed:

flashq dashboard myapp:app --port 5555

Open http://localhost:5555 to see:

  • Real-time task counts by state (pending, running, success, failure)
  • Task list with filtering by state, queue, and search
  • Task detail modal with args, result, error, traceback
  • Actions: cancel, revoke, purge queue
  • Auto-refreshing every 5 seconds
# Or embed in your own ASGI app
from flashq.dashboard import create_dashboard

dashboard = create_dashboard(app, prefix="/admin/tasks")
# Mount alongside your FastAPI/Starlette app

Dead Letter Queue

Inspect and replay permanently failed tasks:

from flashq.dlq import DeadLetterQueue

dlq = DeadLetterQueue(app)
app.add_middleware(dlq.middleware())  # Auto-capture dead tasks

# Later...
for task in dlq.list():
    print(f"{task.task_name}: {task.error}")

dlq.replay(task_id="abc123")  # Re-enqueue with reset retries
dlq.replay_all()              # Replay everything
dlq.purge()                   # Clear DLQ

Periodic Tasks

from flashq import FlashQ, every, cron
from flashq.scheduler import Scheduler

app = FlashQ()

@app.task(name="cleanup")
def cleanup_old_data():
    delete_old_records(days=30)

@app.task(name="daily_report")
def daily_report():
    generate_and_send_report()

scheduler = Scheduler(app)
scheduler.add("cleanup", every(hours=6))
scheduler.add("daily_report", cron("0 9 * * 1-5"))  # 9 AM weekdays
scheduler.start()

Retry & Error Handling

@app.task(max_retries=5, retry_delay=30.0, retry_backoff=True)
def flaky_task():
    # Retries: 30s → 60s → 120s → 240s → 480s (exponential backoff)
    response = requests.get("https://unreliable-api.com")
    response.raise_for_status()
from flashq.exceptions import TaskRetryError

@app.task(max_retries=10)
def smart_retry():
    try:
        do_something()
    except TemporaryError:
        raise TaskRetryError(countdown=5.0)  # Custom retry delay

Backends

SQLite (Default — Zero Config)

app = FlashQ()  # Creates flashq.db in current dir
app = FlashQ(backend=SQLiteBackend(path="/var/lib/flashq/tasks.db"))
app = FlashQ(backend=SQLiteBackend(path=":memory:"))  # For testing

PostgreSQL

Uses LISTEN/NOTIFY for instant task delivery + FOR UPDATE SKIP LOCKED for atomic dequeue.

from flashq.backends.postgres import PostgresBackend
app = FlashQ(backend=PostgresBackend("postgresql://localhost/mydb"))

Redis

Uses sorted sets for scheduling and Lua scripts for atomic operations.

from flashq.backends.redis import RedisBackend
app = FlashQ(backend=RedisBackend("redis://localhost:6379/0"))

CLI

flashq worker myapp:app                  # Start worker
flashq worker myapp:app -q emails,sms    # Specific queues
flashq worker myapp:app -c 16            # 16 concurrent threads
flashq dashboard myapp:app               # Start web dashboard
flashq dashboard myapp:app -p 8080       # Custom port
flashq info myapp:app                    # Queue stats
flashq purge myapp:app -f                # Purge queue

Benchmarks

Measured on Apple Silicon (M-series), Python 3.12, SQLite backend:

Benchmark Tasks Time Throughput Avg Latency
Enqueue (write only) 10,000 0.52s 19,298/s 0.05ms
Roundtrip (c=4) 1,000 25.7s 39/s 0.02ms
I/O-bound (c=8) 500 6.7s 75/s 11.6ms
CPU-bound (c=4) 500 12.8s 39/s 0.03ms

Concurrency scaling (500 tasks):

Workers Throughput
1 10 tasks/s
2 19 tasks/s
4 38 tasks/s
8 100 tasks/s

💡 Throughput scales linearly with concurrency. For I/O-bound workloads, use higher concurrency.

Run benchmarks yourself:

python benchmarks/bench.py

Architecture

Your App → FlashQ → Backend (SQLite/PG/Redis) → Worker(s)
                ↕
          Middleware Stack
          Rate Limiter
          Scheduler
          Dead Letter Queue

FlashQ uses a clean, modular architecture:

  • Backend: Pluggable storage (SQLite, PostgreSQL, Redis)
  • Worker: Thread pool executor with graceful shutdown
  • Middleware: Intercepts every stage of task lifecycle
  • Scheduler: Interval and cron-based periodic dispatch
  • Canvas: Task composition (chain, group, chord)

Roadmap

  • Core engine with SQLite backend
  • PostgreSQL backend (LISTEN/NOTIFY)
  • Redis backend (Lua scripts)
  • Task timeouts (non-blocking)
  • Middleware system
  • Rate limiting (token bucket)
  • Dead letter queue
  • Task chains, groups, chords
  • Periodic/cron scheduler
  • CLI (worker, info, purge)
  • 240 tests, 95% core coverage
  • Web dashboard (real-time monitoring)
  • Task result streaming
  • PyPI publish

Contributing

See CONTRIBUTING.md for development setup and guidelines.

License

MIT License. See LICENSE for details.

bysiber/flashq | GitHunt