Pub/Sub

Pub/Sub

Pub/SubPublisher와, Subscriber를 사용하여 비동기적으로 통신하는 방법이다. YouTube의 채널을 구독하여 알람을 받거나, 메세지를 주고받을때 활용할 수 있는 방법이다. 이벤트를 발행하는 Publisher가 특정 Channel에 이벤트를 전송하고, Channel을 구독하고 있는 Subscriber가 해당 이벤트를 받는 방식으로 동작한다.

Pub/Sub의 방식은 대표적으로 2가지가 있다.

KAFKA

Pub/Sub

KAFKA에서는 Producer/Consumer라는 개념이 있는데 이것은 Publisher/Subscriber와 동일하다. ProducerTopic에게 이벤트를 보내고 안에 있는 각Partition에 분산되어 저장된다. Consumer Group내의 Consumer는 각각 1개 이상의 Partition으로 부터 이벤트를 가져온다. 만약 Partition의 개수보다 Consumer의 개수가 많다면 일을 하지 않는 Consumer가 생기기 때문에 Partition의 수를 Consumer보다 같거나 크게 설정 해 주어야 한다.

각 이벤트가 Partition에 저장되며 높은 처리량과 이벤트의 안정성을 보장한다. 하지만 설정과 운영이 복잡하다고 카더라.

Redis

Pub/Sub

Redis는 각SubscriberChannel을 구독하는 방식이다. KAFKA의 Partition과 다르게 Channel은 이벤트를 저장하지 않는다. 이벤트가 도착하였을때 Subscriber가 존재하지 않는다면 이벤트는 사라진다. delivery guarantee(이벤트/메세지 보장)하지못하며 확장성에 한계가 있다.

매우 빠른 구현이 가능하며, 메모리 기반 데이터베이스이기 때문에 실시간 성능이 뛰어나다. 하지만 앞서 언급 했듯이 Subscriber가 없으면 이벤트를 저장하지 않아 이벤트의 안정성을 보장하지 못한다.

구현

빠른 설정과 구현이 가능한 Redis로 구현해보았으며, NestJs를 사용했다.

NestJs 프로젝트를 생성하고 Redis 외부접속, 통신하기 위하여 ioredis 패키지를 설치한다

npm install ioredis

이후 redis Module, Service를 생성하여 Redis 클라이언트를 생성하고 관리하는 코드를 작성한다. Redis의 ioredis 클라이언트를 사용하여 Redis와 연결을 관리하고 클라이언트 인스턴스를 제공한다.

//redis.module.ts
import { Module } from '@nestjs/common';
import { RedisService } from './redis.service';

@Module({
  providers: [RedisService],
  exports: [RedisService],
})
export class RedisModule {}
//redis.service.ts
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import Redis from 'ioredis';

@Injectable()
export class RedisService implements OnModuleInit, OnModuleDestroy {
  private readonly redisClient: Redis;

  constructor() {
    this.redisClient = new Redis();
  }

  onModuleInit() {
    console.log('Redis client connected');
  }

  onModuleDestroy() {
    this.redisClient.quit();
  }

  get client(): Redis {
    return this.redisClient;
  }
}

이후 Redis의 Pub/Sub 기능을 활용하여 Publish, Subscribe이 가능하도록 Controller와 Service를 생성한다

//pubsub.controller.ts
import { Controller, Post, Body, Get, Logger } from '@nestjs/common';
import { PubSubService } from './pubsub.service';

@Controller('pubsub')
export class PubSubController {
  private readonly logger = new Logger(PubSubController.name);
  constructor(private readonly pubSubService: PubSubService) {}

  @Post('publish')
  async publishMessage(@Body() body: { channel: string; message: string }) {
    await this.pubSubService.publish(body.channel, body.message);
    const message = `Channel: ${body.channel} Message: ${body.message}`;
    return { message };
  }

  @Get('subscribe')
  async subscribeToChannel() {
    //'my-channel' 채널을 Subscribe 한다.
    await this.pubSubService.subscribe('my-channel', (message) => {
      this.logger.log(`Received message: ${message}`);
    });
    return { success: true };
  }
}
import { Injectable, Logger } from '@nestjs/common';
import { RedisService } from 'src/redis/redis.service';

@Injectable()
export class PubSubService {
  private readonly logger = new Logger(PubSubService.name);
  constructor(private readonly redisService: RedisService) {}

  async publish(channel: string, message: string): Promise<void> {
    //Redis에 이벤트를 발행한다.
    await this.redisService.client.publish(channel, message);
    this.logger.log(`Event Published to ${channel}: ${message}`);
  }

  async subscribe(
    channel: string,
    callback: (message: string) => void,
  ): Promise<void> {
    //구독 전용 클라이언트를 Redis 클라이언트를 복제하여 생성한다. Publish와 Subscribe를 분리하여 안정성을 높인다.
    const subscriber = this.redisService.client.duplicate();
    //Channel을 구독한다.
    await subscriber.subscribe(channel);
    this.logger.log(`Subscribe to Channel: ${channel}`);
    //지정된 Channel을 구독하고 메세지를 수신하면 callback 함수를 호출한다.
    subscriber.on('message', (ch, message) => {
      if (ch === channel) {
        this.logger.log(`Message received from ${channel}: ${message}`);
        callback(message);
      }
    });
  }
}

이제 PostMan으로 Publish와 Subscribe를 해보자. 앞서 언급했던대로 Redis는 구독자가 없으면 이벤트가 사라진다.

  { "channel": "my-channel", "message": "Manly-Backend-Developer" }

메세지 타입은 String만 가능하므로 Json을 사용하려면 subscribe에서 추가적인 작업이 필요하다.

Pub/Sub

처음 아무도 구독하지 않은 상태에서 발행을 해도 아무도 받는사람없이 사라진다. 이후 구독을 한 후에 이벤트를 발행하면 구독자에게 응답이 간다.

확실히재밌다 근데 Docker공부를좀..