GitHub Classroom Link: https://classroom.github.com/a/u5fVE7dn
This lab is standalone and doesn’t require the Jarvis monorepo. We’ll build a simple image processing service from scratch.
In this lab, we’ll build a simple job queue system using RQ (Redis Queue). Job queues are essential for handling time-consuming tasks asynchronously - imagine sending emails, processing images, or generating reports. Instead of making users wait, we can offload these tasks to background workers.
What is RQ?
RQ is a simple Python library for queueing jobs and processing them in the background with workers. It’s backed by Redis, an in-memory data store. RQ is simply a nice set of tools that wraps around an existing redis store to give us job queue functionality. RQ’s architecture is similar to what we went over in class:
- Producer: Enqueues jobs to be processed (creates jobs and adds them to the queue)
- Queue/Broker (Redis): Stores jobs waiting to be processed
- Worker: Picks up jobs from the queue and executes them
Redis also serves as an optional Result Store where job results and status are saved.
Job Definitions
In the repository, we’ve included a simple image processing job. Jobs in RQ are just Python functions. Take a look at tasks.py:
import time
from PIL import Image, ImageFilter
import os
def process_image(filename: str, output_filename: str, blur_radius: int = 5):
"""
Simulate a long-running image processing task.
Applies a blur filter to an image.
"""
print(f"Starting to process {filename}...")
# Simulate some processing time
time.sleep(3)
# Open and process the image
img = Image.open(filename)
blurred = img.filter(ImageFilter.GaussianBlur(radius=blur_radius))
# Save the result
blurred.save(output_filename)
print(f"Finished processing {filename} -> {output_filename}")
return output_filename
This is just a regular Python function - nothing special about it! The key thing to note is that they’re time-consuming operations you don’t want to run synchronously within your web api.
Web API
We’ve included a FastAPI server that acts as our Producer - it will enqueue jobs for workers to process. Take a look at api.py. There’s 3 main endpoints, one to enqueue a job to process an image, one to get the status of a job, and one to get the status of the entire queue.
Running the System
Now we need three terminal windows to see everything in action. First, start the docker compose in detached mode. This will spin up the redis instance:
docker compose up -d
Terminal 1 - Start the API server:
uv run fastapi dev api.py --host 0.0.0.0 --port 8000
Visit http://localhost:8000/docs to see the API documentation.
Terminal 2 - Start an RQ worker:
uv run rq worker
You should see output like:
Worker rq:worker:hostname.12345 started with PID 12345, version 1.15.0
Subscribing to channel rq:pubsub:12345
Note: if you had a need to schedule jobs, e.g. run this every day at noon, you’d need to add the --with-scheduler flag.
Terminal 3 - Test the system:
We’ve provided you with a upenn.jpg to test with. Feel free to use this image or any other image you want! Let’s enqueue a job to blur this image using curl:
curl -X POST http://localhost:8000/jobs/process-image \
-H "Content-Type: application/json" \
-d '{
"filename": "upenn.jpg",
"output_filename": "upenn_blurred.jpg",
"blur_radius": 10
}'
You should get a response immediately like:
{
"job_id": "fbc236a8-a22e-4121-aa8d-85a716bd0b9c",
"status": "queued",
"message": "Job enqueued successfully"
}
Watch your worker terminal (Terminal 2) - you’ll see it pick up and process the job! After 3 seconds or so, you should see the upenn_blurred.jpg show up in your directory. You can check the status of the job:
curl http://localhost:8000/jobs/{job_id}
Replace {job_id} with the actual job ID you received.
Understanding Job States
Jobs in RQ go through several states:
queued: Job is waiting to be processedstarted: Worker has picked up the job and is processing itfinished: Job completed successfullyfailed: Job encountered an error
You can check the queue status at any time with the provided endpoint we built:
curl http://localhost:8000/queue/info
Tasks
Now that you understand the basics, let’s extend the system:
Task 1: Add Priority Queues
RQ supports multiple queues with different priorities. Create a high-priority queue for urgent jobs. You may find the documentation of rq useful.
- Modify
api.pyto create two queues:defaultandhigh-priority - Add a
priorityparameter to your job endpoints - Start a worker that processes the high-priority queue first. The ordering in which you pass queues to the worker is the order in which it will prioritize them.
uv run rq worker high-priority default
Task 2: Add Job Retry Logic
Add retry logic for failed jobs:
- Create a job that randomly fails (use
random.random() < 0.5to simulate failure) - Add an endpoint to submit this new job. Use RQ’s
retryparameter when enqueuing to automatically retry failed jobs. Retry up to any number of times you wish. - Test and see how rq automaticall retries jobs!
[OPTIONAL] Task 3: Docker Compose
Add both the api and worker to the Docker compose. You may need to change some hard coded values in api.py.
[OPTIONAL] Task 4: Monitoring with RQ Dashboard (Optional)
RQ comes with a web-based monitoring dashboard. Install and run it:
uv pip install rq-dashboard
uv run rq-dashboard
Visit http://localhost:9181 to see a real-time view of your queues, workers, and jobs.
Cleanup
When you’re done, stop the Docker Compose services:
docker compose down