Aman Goyal

LeetCode LeetCode

Work Queue Systems: Scalable Batch Processing with Independent Tasks

Core Concept


Architecture Overview

Strong separation:


Two Key Interfaces


1. Source Interface (HTTP)

GET http://localhost/api/v1/items
GET http://localhost/api/v1/items/<item-name>

Response format

{
   kind: ItemList,
   apiVersion: v1,
   items: [
      "item-1",
      "item-2"
   ]
}

Important:


2. Worker Interface (File-based)

Simpler + more secure


Core Work Queue Algorithm

Repeat forever
    Get items from source
    Get current jobs
    Find unprocessed items
    Create jobs for them

Full Work Queue Implementation (Python)

import requests
import json
from kubernetes import client, config
import time

namespace = "default"

def make_container(item, obj):
    container = client.V1Container()
    container.image = "my/worker-image"
    container.name = "worker"
    return container

def make_job(item):
    response = requests.get("http://localhost:8000/items/{}".format(item))
    obj = json.loads(response.text)
    job = client.V1Job()
    job.metadata = client.V1ObjectMeta()
    job.metadata.name = item
    job.spec = client.V1JobSpec()
    job.spec.template = client.V1PodTemplate()
    job.spec.template.spec = client.V1PodTemplateSpec()
    job.spec.template.spec.restart_policy = "Never"
    job.spec.template.spec.containers = [
        make_container(item, obj)
    ]
    return job

def update_queue(batch):
    response = requests.get("http://localhost:8000/items")

    obj = json.loads(response.text)
    items = obj['items']

    ret = batch.list_namespaced_job(namespace, watch=False)

    for item in items:
        found = False
        for i in ret.items:
            if i.metadata.name == item:
                found = True
        if not found:
            job = make_job(item)
            batch.create_namespaced_job(namespace, job)

config.load_kube_config()
batch = client.BatchV1Api()

while True:
    update_queue(batch)
    time.sleep(10)

Key idea:


Example: Source Container (Node.js)

const http = require('http');
const fs = require('fs');

const port = 8080;
const path = process.env.MEDIA_PATH;

const requestHandler = (request, response) => {
	console.log(request.url);
	fs.readdir(path + '/*.mp4', (err, items) => {
		var msg = {
			'kind': 'ItemList',
			'apiVersion': 'v1',
			'items': []
		};
		if (!items) {
			return msg;
		}
		for (var i = 0; i < items.length; i++) {
			msg.items.push(items[i]);
		}
		response.end(JSON.stringify(msg));
	});
}

const server = http.createServer(requestHandler);

server.listen(port, (err) => {
	if (err) {
		return console.log('Error starting server', err);
	}

	console.log(`server is active on ${port}`)
});

Lists work items (videos)


Example: Worker (ffmpeg)

ffmpeg -i ${INPUT_FILE} -frames:v 100 thumb.png

Processes each item independently


Scaling Insights

Key Metrics:


Rule:

Processing time / parallelism < interarrival time


Cases:


Scaling Strategies


Multiworker Pattern

Example:

Benefits:


Key Insights


Trade-offs

Pros

Cons


One-line Summary

Work queues process independent tasks using scalable workers, with a generic queue manager orchestrating jobs and ensuring reliable execution.

#Distributed Systems #System Design #Work Queues #Batch Processing #Kubernetes