System Design

Message Queues asoslari

Message Queue — asinxron xabar almashish tizimi.

Muammo: Synchronous Processing

// User request
app.post('/signup', async (req, res) => {
  await db.createUser(req.body);        // 50ms
  await sendEmail(req.body.email);      // 2000ms 
  await createProfile(req.body);        // 100ms
  await notifyAdmins(req.body);         // 500ms
  
  res.json({ success: true });
});

// Total: 2650ms 

User 2.6 sekund kutadi!

Yechim: Message Queue

// Fast response
app.post('/signup', async (req, res) => {
  await db.createUser(req.body);        // 50ms
  await queue.publish('user.created', req.body); // 5ms
  
  res.json({ success: true });
});

// Total: 55ms 

// Background worker
queue.subscribe('user.created', async (data) => {
  await sendEmail(data.email);
  await createProfile(data);
  await notifyAdmins(data);
});

User 55ms ichida javob oladi!

Producer-Consumer Pattern

┌──────────┐    Publish    ┌───────┐
│ Producer ├──────────────→ │ Queue │
└──────────┘                └───┬───┘

                            Subscribe

                        ┌───────┴────────┐
                        ▼                ▼
                   ┌──────────┐    ┌──────────┐
                   │Consumer 1│    │Consumer 2│
                   └──────────┘    └──────────┘

Decoupling: Producer va consumer mustaqil.

Message Queue Benefits

1. Asynchronous Processing

// Without queue: 2000ms
await processImage(file); // Heavy task

// With queue: 10ms
await queue.publish('image.upload', { file });

2. Load Leveling

Peak traffic: 10,000 req/sec
Queue: Absorb spike
Workers: Process at steady rate (1000/sec)

Queue to’ladi, lekin sistemaga yuklama teng.

3. Reliability

Worker crash → Message returns to queue
Retry automatically 

4. Scalability

1 worker → slow
10 workers → 10x faster 
Add/remove workers dynamically

1. RabbitMQ

const amqp = require('amqplib');

// Producer
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('tasks');
channel.sendToQueue('tasks', Buffer.from(JSON.stringify({ task: 'process' })));

// Consumer
channel.consume('tasks', (msg) => {
  const data = JSON.parse(msg.content.toString());
  console.log('Processing:', data);
  channel.ack(msg); // Acknowledge
});

Reliable, feature-rich
Complex setup

2. Redis (Simple Queue)

// Producer
await redis.lPush('queue', JSON.stringify({ task: 'email' }));

// Consumer
while (true) {
  const msg = await redis.brPop('queue', 0); // Blocking pop
  const data = JSON.parse(msg.element);
  await processTask(data);
}

Simple, fast
No advanced features

3. AWS SQS

const AWS = require('aws-sdk');
const sqs = new AWS.SQS();

// Send
await sqs.sendMessage({
  QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/myqueue',
  MessageBody: JSON.stringify({ task: 'process' })
}).promise();

// Receive
const messages = await sqs.receiveMessage({
  QueueUrl: '...',
  MaxNumberOfMessages: 10
}).promise();

Fully managed, scalable
** Pay per request**

4. Apache Kafka (High throughput)

// Producer
await producer.send({
  topic: 'user-events',
  messages: [{ value: JSON.stringify({ userId: 123 }) }]
});

// Consumer
await consumer.subscribe({ topic: 'user-events' });
await consumer.run({
  eachMessage: async ({ message }) => {
    const data = JSON.parse(message.value.toString());
    console.log(data);
  }
});

Millions of messages/sec
Complex, overkill for simple use cases

Message Acknowledgment

// Manual ack
queue.subscribe('tasks', async (msg) => {
  try {
    await processTask(msg.data);
    msg.ack(); // Success 
  } catch (err) {
    msg.nack(); // Retry 
  }
});

Ack qilinmasa → message qaytadan qayta ishlanadi.

Dead Letter Queue

Message → Process → Fail

      Retry 1 → Fail

      Retry 2 → Fail

      Retry 3 → Fail

  Dead Letter Queue (manual investigation)
// RabbitMQ
await channel.assertQueue('tasks', {
  deadLetterExchange: 'dlx',
  messageTtl: 60000, // 1 min
  maxRetries: 3
});

Priority Queue

// High priority tasks first
await queue.publish('tasks', { task: 'urgent' }, { priority: 10 });
await queue.publish('tasks', { task: 'normal' }, { priority: 5 });

// Consumer processes priority 10 first

Message Routing (RabbitMQ)

1. Direct Exchange

// Producer
channel.publish('logs', 'error', Buffer.from('Error occurred'));

// Consumer
channel.bindQueue('error-queue', 'logs', 'error'); // Only 'error' messages

2. Topic Exchange

// Producer
channel.publish('logs', 'app.error.critical', msg);

// Consumer
channel.bindQueue('critical-queue', 'logs', 'app.*.critical'); // Wildcard

3. Fanout Exchange

// Broadcast to all queues
channel.publish('notifications', '', msg); // All subscribers receive

Best Practices

1. Idempotent consumers

// Message may be delivered twice!
async function processOrder(orderId) {
  const exists = await db.exists(`processed:${orderId}`);
  if (exists) return; // Skip duplicate
  
  await processPayment(orderId);
  await db.set(`processed:${orderId}`, true);
}

2. Message TTL

// Expire old messages
await queue.publish('tasks', data, { 
  expiration: 3600000 // 1 hour
});

3. Monitoring

// Queue depth alert
const depth = await queue.count();
if (depth > 10000) {
  alert('Queue backing up!');
}

4. Graceful shutdown

process.on('SIGTERM', async () => {
  console.log('Shutting down...');
  await consumer.stop(); // Finish current message
  process.exit(0);
});

Use Cases

1. Email sending

queue.publish('email', { to: 'user@example.com', subject: '...' });

2. Image processing

queue.publish('image.resize', { url: '...', sizes: [100, 200, 400] });

3. Analytics events

queue.publish('analytics', { event: 'page_view', userId: 123 });

4. Scheduled tasks

queue.publish('tasks', { type: 'cleanup' }, { delay: 3600000 }); // 1 hour

Xulosa

Message Queue:

Popular:

Best practices:

Keyingi dars: Publish-Subscribe pattern.