System Design
Message Queues asoslari
Message Queue — asinxron xabar almashish tizimi.
Muammo: Synchronous Processing
// User request
app.post('/signup', async (req, res) => {
await db.createUser(req.body); // 50ms
await sendEmail(req.body.email); // 2000ms
await createProfile(req.body); // 100ms
await notifyAdmins(req.body); // 500ms
res.json({ success: true });
});
// Total: 2650ms
User 2.6 sekund kutadi!
Yechim: Message Queue
// Fast response
app.post('/signup', async (req, res) => {
await db.createUser(req.body); // 50ms
await queue.publish('user.created', req.body); // 5ms
res.json({ success: true });
});
// Total: 55ms
// Background worker
queue.subscribe('user.created', async (data) => {
await sendEmail(data.email);
await createProfile(data);
await notifyAdmins(data);
});
User 55ms ichida javob oladi!
Producer-Consumer Pattern
┌──────────┐ Publish ┌───────┐
│ Producer ├──────────────→ │ Queue │
└──────────┘ └───┬───┘
│
Subscribe
│
┌───────┴────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│Consumer 1│ │Consumer 2│
└──────────┘ └──────────┘
Decoupling: Producer va consumer mustaqil.
Message Queue Benefits
1. Asynchronous Processing
// Without queue: 2000ms
await processImage(file); // Heavy task
// With queue: 10ms
await queue.publish('image.upload', { file });
2. Load Leveling
Peak traffic: 10,000 req/sec
Queue: Absorb spike
Workers: Process at steady rate (1000/sec)
Queue to’ladi, lekin sistemaga yuklama teng.
3. Reliability
Worker crash → Message returns to queue
Retry automatically
4. Scalability
1 worker → slow
10 workers → 10x faster
Add/remove workers dynamically
Popular Message Queues
1. RabbitMQ
const amqp = require('amqplib');
// Producer
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('tasks');
channel.sendToQueue('tasks', Buffer.from(JSON.stringify({ task: 'process' })));
// Consumer
channel.consume('tasks', (msg) => {
const data = JSON.parse(msg.content.toString());
console.log('Processing:', data);
channel.ack(msg); // Acknowledge
});
Reliable, feature-rich
Complex setup
2. Redis (Simple Queue)
// Producer
await redis.lPush('queue', JSON.stringify({ task: 'email' }));
// Consumer
while (true) {
const msg = await redis.brPop('queue', 0); // Blocking pop
const data = JSON.parse(msg.element);
await processTask(data);
}
Simple, fast
No advanced features
3. AWS SQS
const AWS = require('aws-sdk');
const sqs = new AWS.SQS();
// Send
await sqs.sendMessage({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/myqueue',
MessageBody: JSON.stringify({ task: 'process' })
}).promise();
// Receive
const messages = await sqs.receiveMessage({
QueueUrl: '...',
MaxNumberOfMessages: 10
}).promise();
Fully managed, scalable
** Pay per request**
4. Apache Kafka (High throughput)
// Producer
await producer.send({
topic: 'user-events',
messages: [{ value: JSON.stringify({ userId: 123 }) }]
});
// Consumer
await consumer.subscribe({ topic: 'user-events' });
await consumer.run({
eachMessage: async ({ message }) => {
const data = JSON.parse(message.value.toString());
console.log(data);
}
});
Millions of messages/sec
Complex, overkill for simple use cases
Message Acknowledgment
// Manual ack
queue.subscribe('tasks', async (msg) => {
try {
await processTask(msg.data);
msg.ack(); // Success
} catch (err) {
msg.nack(); // Retry
}
});
Ack qilinmasa → message qaytadan qayta ishlanadi.
Dead Letter Queue
Message → Process → Fail
↓
Retry 1 → Fail
↓
Retry 2 → Fail
↓
Retry 3 → Fail
↓
Dead Letter Queue (manual investigation)
// RabbitMQ
await channel.assertQueue('tasks', {
deadLetterExchange: 'dlx',
messageTtl: 60000, // 1 min
maxRetries: 3
});
Priority Queue
// High priority tasks first
await queue.publish('tasks', { task: 'urgent' }, { priority: 10 });
await queue.publish('tasks', { task: 'normal' }, { priority: 5 });
// Consumer processes priority 10 first
Message Routing (RabbitMQ)
1. Direct Exchange
// Producer
channel.publish('logs', 'error', Buffer.from('Error occurred'));
// Consumer
channel.bindQueue('error-queue', 'logs', 'error'); // Only 'error' messages
2. Topic Exchange
// Producer
channel.publish('logs', 'app.error.critical', msg);
// Consumer
channel.bindQueue('critical-queue', 'logs', 'app.*.critical'); // Wildcard
3. Fanout Exchange
// Broadcast to all queues
channel.publish('notifications', '', msg); // All subscribers receive
Best Practices
1. Idempotent consumers
// Message may be delivered twice!
async function processOrder(orderId) {
const exists = await db.exists(`processed:${orderId}`);
if (exists) return; // Skip duplicate
await processPayment(orderId);
await db.set(`processed:${orderId}`, true);
}
2. Message TTL
// Expire old messages
await queue.publish('tasks', data, {
expiration: 3600000 // 1 hour
});
3. Monitoring
// Queue depth alert
const depth = await queue.count();
if (depth > 10000) {
alert('Queue backing up!');
}
4. Graceful shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down...');
await consumer.stop(); // Finish current message
process.exit(0);
});
Use Cases
1. Email sending
queue.publish('email', { to: 'user@example.com', subject: '...' });
2. Image processing
queue.publish('image.resize', { url: '...', sizes: [100, 200, 400] });
3. Analytics events
queue.publish('analytics', { event: 'page_view', userId: 123 });
4. Scheduled tasks
queue.publish('tasks', { type: 'cleanup' }, { delay: 3600000 }); // 1 hour
Xulosa
Message Queue:
- Asynchronous processing
- Load leveling
- Reliability (retry)
- Scalability
Popular:
- RabbitMQ - feature-rich
- Redis - simple, fast
- SQS - managed
- Kafka - high throughput
Best practices:
- Idempotent consumers
- Dead letter queue
- Monitoring
- Graceful shutdown
Keyingi dars: Publish-Subscribe pattern.