10/2 Job Queues with RQ

GitHub Classroom Link: https://classroom.github.com/a/u5fVE7dn

This lab is standalone and doesn’t require the Jarvis monorepo. We’ll build a simple image processing service from scratch.

In this lab, we’ll build a simple job queue system using RQ (Redis Queue). Job queues are essential for handling time-consuming tasks asynchronously - imagine sending emails, processing images, or generating reports. Instead of making users wait, we can offload these tasks to background workers.

What is RQ?

RQ is a simple Python library for queueing jobs and processing them in the background with workers. It’s backed by Redis, an in-memory data store. RQ is simply a nice set of tools that wraps around an existing redis store to give us job queue functionality. RQ’s architecture is similar to what we went over in class:

  1. Producer: Enqueues jobs to be processed (creates jobs and adds them to the queue)
  2. Queue/Broker (Redis): Stores jobs waiting to be processed
  3. Worker: Picks up jobs from the queue and executes them

Redis also serves as an optional Result Store where job results and status are saved.

Job Definitions

In the repository, we’ve included a simple image processing job. Jobs in RQ are just Python functions. Take a look at tasks.py:

import time
from PIL import Image, ImageFilter
import os


def process_image(filename: str, output_filename: str, blur_radius: int = 5):
    """
    Simulate a long-running image processing task.
    Applies a blur filter to an image.
    """
    print(f"Starting to process {filename}...")

    # Simulate some processing time
    time.sleep(3)

    # Open and process the image
    img = Image.open(filename)
    blurred = img.filter(ImageFilter.GaussianBlur(radius=blur_radius))

    # Save the result
    blurred.save(output_filename)

    print(f"Finished processing {filename} -> {output_filename}")
    return output_filename

This is just a regular Python function - nothing special about it! The key thing to note is that they’re time-consuming operations you don’t want to run synchronously within your web api.

Web API

We’ve included a FastAPI server that acts as our Producer - it will enqueue jobs for workers to process. Take a look at api.py. There’s 3 main endpoints, one to enqueue a job to process an image, one to get the status of a job, and one to get the status of the entire queue.

Running the System

Now we need three terminal windows to see everything in action. First, start the docker compose in detached mode. This will spin up the redis instance:

docker compose up -d

Terminal 1 - Start the API server:

uv run fastapi dev api.py --host 0.0.0.0 --port 8000

Visit http://localhost:8000/docs to see the API documentation.

Terminal 2 - Start an RQ worker:

uv run rq worker

You should see output like:

Worker rq:worker:hostname.12345 started with PID 12345, version 1.15.0
Subscribing to channel rq:pubsub:12345

Note: if you had a need to schedule jobs, e.g. run this every day at noon, you’d need to add the --with-scheduler flag.

Terminal 3 - Test the system:

We’ve provided you with a upenn.jpg to test with. Feel free to use this image or any other image you want! Let’s enqueue a job to blur this image using curl:

curl -X POST http://localhost:8000/jobs/process-image \
  -H "Content-Type: application/json" \
  -d '{
    "filename": "upenn.jpg",
    "output_filename": "upenn_blurred.jpg",
    "blur_radius": 10
  }'

You should get a response immediately like:

{
  "job_id": "fbc236a8-a22e-4121-aa8d-85a716bd0b9c",
  "status": "queued",
  "message": "Job enqueued successfully"
}

Watch your worker terminal (Terminal 2) - you’ll see it pick up and process the job! After 3 seconds or so, you should see the upenn_blurred.jpg show up in your directory. You can check the status of the job:

curl http://localhost:8000/jobs/{job_id}

Replace {job_id} with the actual job ID you received.

Understanding Job States

Jobs in RQ go through several states:

  • queued: Job is waiting to be processed
  • started: Worker has picked up the job and is processing it
  • finished: Job completed successfully
  • failed: Job encountered an error

You can check the queue status at any time with the provided endpoint we built:

curl http://localhost:8000/queue/info

Tasks

Now that you understand the basics, let’s extend the system:

Task 1: Add Priority Queues

RQ supports multiple queues with different priorities. Create a high-priority queue for urgent jobs. You may find the documentation of rq useful.

  1. Modify api.py to create two queues: default and high-priority
  2. Add a priority parameter to your job endpoints
  3. Start a worker that processes the high-priority queue first. The ordering in which you pass queues to the worker is the order in which it will prioritize them.
uv run rq worker high-priority default

Task 2: Add Job Retry Logic

Add retry logic for failed jobs:

  1. Create a job that randomly fails (use random.random() < 0.5 to simulate failure)
  2. Add an endpoint to submit this new job. Use RQ’s retry parameter when enqueuing to automatically retry failed jobs. Retry up to any number of times you wish.
  3. Test and see how rq automaticall retries jobs!

[OPTIONAL] Task 3: Docker Compose

Add both the api and worker to the Docker compose. You may need to change some hard coded values in api.py.

[OPTIONAL] Task 4: Monitoring with RQ Dashboard (Optional)

RQ comes with a web-based monitoring dashboard. Install and run it:

uv pip install rq-dashboard
uv run rq-dashboard

Visit http://localhost:9181 to see a real-time view of your queues, workers, and jobs.

Cleanup

When you’re done, stop the Docker Compose services:

docker compose down

How do we handle long-running tasks without blocking?

2025-10-02