NestJS Logo

Kafka

Kafka 是一个开源的分布式流处理平台,具有三个关键功能:

  • 发布和订阅记录流,类似于消息队列或企业消息系统。
  • 以容错持久的方式存储记录流。
  • 在记录发生时处理记录流。

Kafka 项目旨在为处理实时数据流提供统一的、高吞吐量、低延迟的平台。它与 Apache Storm 和 Spark 非常好地集成,用于实时流式数据分析。

安装#

要开始构建基于Kafka的微服务,首先安装所需的包:


$ npm i --save kafkajs

概述#

与其他Nest微服务传输层实现类似,您可以使用传递给createMicroservice()方法的选项对象的transport属性来选择Kafka传输机制,以及一个可选的options属性,如下所示:

main.ts
JS TS

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});
提示Transport枚举是从@nestjs/microservices包中导入的。

选项#

options属性是特定于所选择的传输器的。Kafka传输器公开了如下所述的属性。

client客户端配置选项 (详细阅读 在这里)
consumer消费者配置选项 (详细阅读 在这里)
run运行配置选项 (详细阅读 在这里)
subscribe订阅配置选项 (详细阅读 在这里)
producer生产者配置选项 (详细阅读 在这里)
send发送配置选项 (详细阅读 在这里)
producerOnlyMode跳过消费者组注册,仅充当生产者的特性标志 (boolean)
postfixId更改clientId值的后缀(string)

客户端#

在Kafka中,与其他微服务传输器相比,有一个小差异。我们使用ClientKafka类,而不是ClientProxy类。

与其他微服务传输器一样,您有多种选项可以创建ClientKafka实例。

创建实例的一种方法是使用ClientsModule。要使用ClientsModule创建客户端实例,导入它并使用register()方法传递一个选项对象,其中包含与上述createMicroservice()方法中显示的相同属性,以及一个用作注入令牌的name属性。在此处了解更多关于ClientsModule的信息。


@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

创建客户端的其他选项(包括ClientProxyFactory@Client())也可以使用。您可以在此处阅读有关它们的信息。

使用@Client()装饰器如下所示:


@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero',
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer'
    }
  }
})
client: ClientKafka;

消息模式#

Kafka微服务消息模式利用两个主题进行请求和回复通道。ClientKafka#send()方法通过将关联ID、回复主题和回复分区与请求消息相关联,发送带有返回地址的消息。这要求在发送消息之前,ClientKafka实例订阅回复主题并分配到至少一个分区。

随后,您需要为每个运行的Nest应用程序至少分配一个回复主题分区。例如,如果您运行了4个Nest应用程序,但回复主题只有3个分区,则在尝试发送消息时,其中1个Nest应用程序将出现错误。

当启动新的ClientKafka实例时,它们会加入消费者组并订阅其各自的主题。这个过程会触发为消费者组的消费者分配主题分区的重新平衡。

通常,主题分区是使用循环分区器进行分配的,它将主题分区分配给按消费者名称排序的消费者集合,这些名称在应用程序启动时随机设置。然而,当新的消费者加入消费者组时,新的消费者可以位于消费者集合的任何位置。这会创建一个条件,在这种情况下,预先存在的消费者在新消费者之后定位。结果是,在重新平衡之前发送的请求的响应消息会丢失,因为预先存在的消费者被分配了不同的分区。

为了防止ClientKafka消费者丢失响应消息,使用了Nest特定的内置自定义分区器。这个自定义分区器根据在应用程序启动时设置的高分辨率时间戳(process.hrtime())将分区分配给按时间排序的消费者集合。

消息响应订阅#

注意 此部分仅适用于您使用请求-响应消息样式(使用@MessagePattern装饰器和ClientKafka#send方法)的情况。订阅响应主题对于基于事件通信(使用@EventPattern装饰器和ClientKafka#emit方法)是不必要的。

ClientKafka类提供了subscribeToResponseOf()方法。subscribeToResponseOf()方法接受一个请求的主题名称作为参数,并将派生的回复主题名称添加到回复主题的集合中。在实现消息模式时,需要使用此方法。

heroes.controller.ts
JS TS

onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
}

如果ClientKafka实例是异步创建的,则必须在调用connect()方法之前调用subscribeToResponseOf()方法。

heroes.controller.ts
JS TS

async onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
  await this.client.connect();
}

传入消息#

Nest将传入的Kafka消息作为一个具有keyvalueheaders属性的对象进行接收,这些属性的值为Buffer类型。然后,Nest通过将缓冲区转换为字符串来解析这些值。如果字符串类似于一个“对象”,Nest会尝试将该字符串解析为JSON。然后,value会传递给其关联的处理程序。

传出消息#

Nest在发布事件或发送消息时,在对参数进行序列化处理后发送出去的Kafka消息。这发生在传递给ClientKafkaemit()send()方法的参数上,或者从@MessagePattern方法返回的值上。这种序列化会使用JSON.stringify()toString()原型方法将非字符串或缓冲区的对象“字符串化”。

heroes.controller.ts
JS TS

@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const dragonId = message.dragonId;
    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];
    return items;
  }
}
提示@Payload()是从@nestjs/microservices导入的。

通过传递一个具有keyvalue属性的对象,也可以对传出的消息进行分区键化(Keying)。对消息进行分区键化对于满足共同分区要求是很重要的。

heroes.controller.ts
JS TS

@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        realm
      },
      key: heroId,
      value: items
    }
  }
}

此外,以这种格式传递的消息还可以包含在headers散列属性中设置的自定义头。头散列属性值必须是string类型或Buffer类型。

heroes.controller.ts
JS TS

@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        kafka_nestRealm: realm
      },
      key: heroId,
      value: items
    }
  }
}

基于事件#

虽然请求-响应方法在服务之间交换消息时是理想的,但当您的消息风格是基于事件(而基于事件又是Kafka的理想应用场景)时,它可能不太适用——您只想发布事件而无需等待响应。在这种情况下,您不希望为维护两个主题所需的请求-响应开销。

查看这两个部分以了解更多信息:概述:基于事件概述:发布事件

上下文#

在更复杂的场景中,您可能需要访问有关传入请求的更多信息。当使用Kafka传输器时,您可以访问KafkaContext对象。

JS TS

@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  console.log(`Topic: ${context.getTopic()}`);
}

@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
  console.log(`Topic: ${context.getTopic()}`);
}
提示@Payload()@Ctx()KafkaContext 是从 @nestjs/microservices 包中导入的。

要访问原始的Kafka IncomingMessage对象,请使用KafkaContext对象的getMessage()方法,如下所示:

JS TS

@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

其中IncomingMessage满足以下接口:


interface IncomingMessage {
  topic: string;
  partition: number;
  timestamp: string;
  size: number;
  attributes: number;
  offset: string;
  key: any;
  value: any;
  headers: Record<string, any>;
}

如果您的处理程序对于每条接收到的消息需要较长的处理时间,您应该考虑使用heartbeat回调。要检索heartbeat函数,请使用KafkaContextgetHeartbeat()方法,如下所示:

JS TS

@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const heartbeat = context.getHeartbeat();

  // Do some slow processing
  await doWorkPart1();

  // Send heartbeat to not exceed the sessionTimeout
  await heartbeat();

  // Do some slow processing again
  await doWorkPart2();
}

命名约定#

Kafka微服务组件在client.clientIdconsumer.groupId选项后附加了它们各自角色的描述,以防止Nest微服务客户端和服务器组件之间的冲突。默认情况下,ClientKafka组件将-client附加到client.clientId,而ServerKafka组件将-server附加到consumer.groupId。请注意,下面提供的值是如何通过这种方式转换的(如注释中所示)。

main.ts
JS TS

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-server
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-server
    },
  }
});

对于客户端:

heroes.controller.ts
JS TS

@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-client
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-client
    }
  }
})
client: ClientKafka;
提示 Kafka客户端和消费者的命名约定可以通过扩展ClientKafkaKafkaServer来进行自定义,您可以在自己的自定义提供程序中覆盖构造函数。

由于Kafka微服务消息模式使用两个主题来进行请求和回复通道,因此应从请求主题派生回复模式。默认情况下,回复主题的名称是将请求主题名称与.reply附加在一起组成的。

heroes.controller.ts
JS TS

onModuleInit() {
  this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}
提示 Kafka回复主题的命名约定可以通过扩展ClientKafka来进行自定义,您可以在自己的自定义提供程序中覆盖getResponsePatternName方法。

可重试异常#

与其他传输器类似,所有未处理的异常都会自动包装成RpcException并转换为“用户友好”的格式。但是,在某些边缘情况下,您可能希望绕过此机制,让异常被kafkajs驱动程序消费。在处理消息时抛出异常会指示kafkajs重试它(重新传递它),这意味着即使消息(或事件)处理程序被触发,偏移量也不会提交到Kafka。

警告 对于事件处理程序(基于事件的通信),默认情况下,所有未处理的异常都被视为可重试的异常

为此,您可以使用一个名为KafkaRetriableException的专用类,如下所示:


throw new KafkaRetriableException('...');
提示KafkaRetriableException类是从@nestjs/microservices包中导出的。

提交偏移量#

在使用Kafka时,提交偏移量是至关重要的。默认情况下,消息将在特定时间后自动提交。要获取更多信息,请访问KafkaJS文档ClientKafka提供了一种手动提交偏移量的方式,其工作方式类似于原生KafkaJS实现

JS TS

@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
  // business logic

  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  await this.client.commitOffsets([{ topic, partition, offset }])
}

@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
  // business logic

  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  await this.client.commitOffsets([{ topic, partition, offset }])
}

要禁用消息的自动提交,在run配置中设置autoCommit: false,如下所示:

main.ts
JS TS

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
});

const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
});

支持一下