bysiber/flashq
⚡ The task queue that works out of the box — no Redis, no RabbitMQ, just pip install and go.
⚡ FlashQ
The task queue that works out of the box — no Redis, no RabbitMQ, just pip install flashq and go.
Why FlashQ?
Every Python developer has been there: you need background tasks, you look at Celery, and suddenly you need Redis or RabbitMQ running, a separate broker config, and 200 lines of boilerplate before your first task runs.
FlashQ changes that. SQLite is the default backend — zero external dependencies, zero config. Your tasks persist across restarts, and you can scale to PostgreSQL or Redis when you need to.
from flashq import FlashQ
app = FlashQ() # That's it. Uses SQLite by default.
@app.task()
def send_email(to: str, subject: str) -> None:
print(f"Sending email to {to}: {subject}")
# Enqueue a task
send_email.delay(to="user@example.com", subject="Welcome!")Features
| Feature | FlashQ | Celery | Dramatiq | Huey | TaskIQ |
|---|---|---|---|---|---|
| Zero-config setup | ✅ | ❌ | ❌ | ❌ | |
| SQLite backend | ✅ | ❌ | ❌ | ✅ | ❌ |
| PostgreSQL backend | ✅ | ❌ | ❌ | ❌ | |
| Redis backend | ✅ | ✅ | ✅ | ✅ | ✅ |
| Async + Sync tasks | ✅ | ❌ | ❌ | ❌ | Async only |
Type-safe .delay() |
✅ | ❌ | ❌ | ✅ | |
| Task chains/groups | ✅ | ✅ | ❌ | ❌ | ❌ |
| Middleware system | ✅ | ✅ | ✅ | ❌ | ✅ |
| Rate limiting | ✅ | ✅ | ❌ | ❌ | ❌ |
| Web dashboard | ✅ | ❌ | ❌ | ❌ | |
| Dead letter queue | ✅ | ❌ | ❌ | ❌ | ❌ |
| Task timeouts | ✅ | ✅ | ✅ | ❌ | ✅ |
| Periodic/cron scheduler | ✅ | ❌ | ✅ | ||
| Zero dependencies | ✅ | ❌ | ❌ | ❌ | ❌ |
Installation
# Core (SQLite only — zero dependencies!)
pip install flashq
# With Redis
pip install "flashq[redis]"
# With PostgreSQL
pip install "flashq[postgres]"
# Development
pip install "flashq[dev]"Try It Now!
See FlashQ in action with the built-in demo — zero config needed:
pip install flashq
python -m flashq.demoYou'll see 20 tasks being created, enqueued, processed by workers, and results printed in real-time.
Quick Start
1. Define tasks
# tasks.py
from flashq import FlashQ
app = FlashQ()
@app.task()
def add(x: int, y: int) -> int:
return x + y
@app.task(queue="emails", max_retries=5, retry_delay=30.0)
def send_email(to: str, subject: str, body: str) -> dict:
return {"status": "sent", "to": to}
@app.task(timeout=120.0) # Kill if takes >2 min
async def process_image(url: str) -> str:
# Async tasks just work™
result = await download_and_resize(url)
return result2. Enqueue tasks
from tasks import add, send_email
# Simple dispatch
handle = add.delay(2, 3)
# With options
handle = send_email.apply(
kwargs={"to": "user@example.com", "subject": "Hi", "body": "Hello!"},
countdown=60, # delay by 60 seconds
)
# Check result
result = handle.get_result()
if result and result.is_success:
print(f"Result: {result.result}")3. Start the worker
flashq worker tasks:app ⚡ FlashQ Worker
├─ name: worker-12345
├─ backend: SQLiteBackend
├─ queues: default
├─ concurrency: 4
├─ tasks: 3
│ └─ tasks.add
│ └─ tasks.send_email
│ └─ tasks.process_image
└─ Ready! Waiting for tasks...
Task Composition
Chain tasks sequentially, run them in parallel, or combine both:
from flashq import chain, group, chord
# Chain: sequential — result of each passed to next
pipe = chain(
download.s("https://example.com/data.csv"),
parse_csv.s(),
store_results.s(table="imports"),
)
pipe.delay(app)
# Group: parallel execution
batch = group(
send_email.s(to="alice@test.com", subject="Hi"),
send_email.s(to="bob@test.com", subject="Hi"),
send_email.s(to="carol@test.com", subject="Hi"),
)
handle = batch.delay(app)
results = handle.get_results(timeout=30)
# Chord: parallel + callback when all complete
workflow = chord(
group(fetch_price.s("AAPL"), fetch_price.s("GOOG"), fetch_price.s("MSFT")),
aggregate_prices.s(),
)
workflow.delay(app)Middleware
Intercept task lifecycle events for logging, monitoring, or custom logic:
from flashq import FlashQ, Middleware
class MetricsMiddleware(Middleware):
def before_execute(self, message):
self.start = time.time()
return message
def after_execute(self, message, result):
duration = time.time() - self.start
statsd.timing(f"task.{message.task_name}.duration", duration)
def on_error(self, message, exc):
sentry.capture_exception(exc)
return False # Don't suppress
def on_dead(self, message, exc):
alert_ops_team(f"Task {message.task_name} permanently failed: {exc}")
app = FlashQ()
app.add_middleware(MetricsMiddleware())Built-in middlewares: LoggingMiddleware, TimeoutMiddleware, RateLimiter.
Rate Limiting
from flashq.ratelimit import RateLimiter
limiter = RateLimiter(default_rate="100/m") # 100 tasks/minute global
limiter.configure("send_email", rate="10/m") # 10 emails/minute
limiter.configure("api_call", rate="60/h") # 60 API calls/hour
app.add_middleware(limiter)Web Dashboard
Built-in monitoring UI — no extra services needed:
flashq dashboard myapp:app --port 5555Open http://localhost:5555 to see:
- Real-time task counts by state (pending, running, success, failure)
- Task list with filtering by state, queue, and search
- Task detail modal with args, result, error, traceback
- Actions: cancel, revoke, purge queue
- Auto-refreshing every 5 seconds
# Or embed in your own ASGI app
from flashq.dashboard import create_dashboard
dashboard = create_dashboard(app, prefix="/admin/tasks")
# Mount alongside your FastAPI/Starlette appDead Letter Queue
Inspect and replay permanently failed tasks:
from flashq.dlq import DeadLetterQueue
dlq = DeadLetterQueue(app)
app.add_middleware(dlq.middleware()) # Auto-capture dead tasks
# Later...
for task in dlq.list():
print(f"{task.task_name}: {task.error}")
dlq.replay(task_id="abc123") # Re-enqueue with reset retries
dlq.replay_all() # Replay everything
dlq.purge() # Clear DLQPeriodic Tasks
from flashq import FlashQ, every, cron
from flashq.scheduler import Scheduler
app = FlashQ()
@app.task(name="cleanup")
def cleanup_old_data():
delete_old_records(days=30)
@app.task(name="daily_report")
def daily_report():
generate_and_send_report()
scheduler = Scheduler(app)
scheduler.add("cleanup", every(hours=6))
scheduler.add("daily_report", cron("0 9 * * 1-5")) # 9 AM weekdays
scheduler.start()Retry & Error Handling
@app.task(max_retries=5, retry_delay=30.0, retry_backoff=True)
def flaky_task():
# Retries: 30s → 60s → 120s → 240s → 480s (exponential backoff)
response = requests.get("https://unreliable-api.com")
response.raise_for_status()from flashq.exceptions import TaskRetryError
@app.task(max_retries=10)
def smart_retry():
try:
do_something()
except TemporaryError:
raise TaskRetryError(countdown=5.0) # Custom retry delayBackends
SQLite (Default — Zero Config)
app = FlashQ() # Creates flashq.db in current dir
app = FlashQ(backend=SQLiteBackend(path="/var/lib/flashq/tasks.db"))
app = FlashQ(backend=SQLiteBackend(path=":memory:")) # For testingPostgreSQL
Uses LISTEN/NOTIFY for instant task delivery + FOR UPDATE SKIP LOCKED for atomic dequeue.
from flashq.backends.postgres import PostgresBackend
app = FlashQ(backend=PostgresBackend("postgresql://localhost/mydb"))Redis
Uses sorted sets for scheduling and Lua scripts for atomic operations.
from flashq.backends.redis import RedisBackend
app = FlashQ(backend=RedisBackend("redis://localhost:6379/0"))CLI
flashq worker myapp:app # Start worker
flashq worker myapp:app -q emails,sms # Specific queues
flashq worker myapp:app -c 16 # 16 concurrent threads
flashq dashboard myapp:app # Start web dashboard
flashq dashboard myapp:app -p 8080 # Custom port
flashq info myapp:app # Queue stats
flashq purge myapp:app -f # Purge queueBenchmarks
Measured on Apple Silicon (M-series), Python 3.12, SQLite backend:
| Benchmark | Tasks | Time | Throughput | Avg Latency |
|---|---|---|---|---|
| Enqueue (write only) | 10,000 | 0.52s | 19,298/s | 0.05ms |
| Roundtrip (c=4) | 1,000 | 25.7s | 39/s | 0.02ms |
| I/O-bound (c=8) | 500 | 6.7s | 75/s | 11.6ms |
| CPU-bound (c=4) | 500 | 12.8s | 39/s | 0.03ms |
Concurrency scaling (500 tasks):
| Workers | Throughput |
|---|---|
| 1 | 10 tasks/s |
| 2 | 19 tasks/s |
| 4 | 38 tasks/s |
| 8 | 100 tasks/s |
💡 Throughput scales linearly with concurrency. For I/O-bound workloads, use higher concurrency.
Run benchmarks yourself:
python benchmarks/bench.pyArchitecture
Your App → FlashQ → Backend (SQLite/PG/Redis) → Worker(s)
↕
Middleware Stack
Rate Limiter
Scheduler
Dead Letter Queue
FlashQ uses a clean, modular architecture:
- Backend: Pluggable storage (SQLite, PostgreSQL, Redis)
- Worker: Thread pool executor with graceful shutdown
- Middleware: Intercepts every stage of task lifecycle
- Scheduler: Interval and cron-based periodic dispatch
- Canvas: Task composition (chain, group, chord)
Roadmap
- Core engine with SQLite backend
- PostgreSQL backend (LISTEN/NOTIFY)
- Redis backend (Lua scripts)
- Task timeouts (non-blocking)
- Middleware system
- Rate limiting (token bucket)
- Dead letter queue
- Task chains, groups, chords
- Periodic/cron scheduler
- CLI (worker, info, purge)
- 240 tests, 95% core coverage
- Web dashboard (real-time monitoring)
- Task result streaming
- PyPI publish
Contributing
See CONTRIBUTING.md for development setup and guidelines.
License
MIT License. See LICENSE for details.