Kafka
Kafka 是一个开源的分布式流处理平台,具有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息系统。
- 以容错持久的方式存储记录流。
- 在记录发生时处理记录流。
Kafka 项目旨在为处理实时数据流提供统一的、高吞吐量、低延迟的平台。它与 Apache Storm 和 Spark 非常好地集成,用于实时流式数据分析。
安装#
要开始构建基于Kafka的微服务,首先安装所需的包:
$ npm i --save kafkajs
概述#
与其他Nest微服务传输层实现类似,您可以使用传递给createMicroservice()
方法的选项对象的transport
属性来选择Kafka传输机制,以及一个可选的options
属性,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
提示Transport
枚举是从@nestjs/microservices
包中导入的。
选项#
options
属性是特定于所选择的传输器的。Kafka传输器公开了如下所述的属性。
client | 客户端配置选项 (详细阅读 在这里) |
consumer | 消费者配置选项 (详细阅读 在这里) |
run | 运行配置选项 (详细阅读 在这里) |
subscribe | 订阅配置选项 (详细阅读 在这里) |
producer | 生产者配置选项 (详细阅读 在这里) |
send | 发送配置选项 (详细阅读 在这里) |
producerOnlyMode | 跳过消费者组注册,仅充当生产者的特性标志 (boolean ) |
postfixId | 更改clientId值的后缀(string ) |
客户端#
在Kafka中,与其他微服务传输器相比,有一个小差异。我们使用ClientKafka
类,而不是ClientProxy
类。
与其他微服务传输器一样,您有多种选项可以创建ClientKafka
实例。
创建实例的一种方法是使用ClientsModule
。要使用ClientsModule
创建客户端实例,导入它并使用register()
方法传递一个选项对象,其中包含与上述createMicroservice()
方法中显示的相同属性,以及一个用作注入令牌的name
属性。在此处了解更多关于ClientsModule
的信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
创建客户端的其他选项(包括ClientProxyFactory
或@Client()
)也可以使用。您可以在此处阅读有关它们的信息。
使用@Client()
装饰器如下所示:
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
})
client: ClientKafka;
消息模式#
Kafka微服务消息模式利用两个主题进行请求和回复通道。ClientKafka#send()
方法通过将关联ID、回复主题和回复分区与请求消息相关联,发送带有返回地址的消息。这要求在发送消息之前,ClientKafka
实例订阅回复主题并分配到至少一个分区。
随后,您需要为每个运行的Nest应用程序至少分配一个回复主题分区。例如,如果您运行了4个Nest应用程序,但回复主题只有3个分区,则在尝试发送消息时,其中1个Nest应用程序将出现错误。
当启动新的ClientKafka
实例时,它们会加入消费者组并订阅其各自的主题。这个过程会触发为消费者组的消费者分配主题分区的重新平衡。
通常,主题分区是使用循环分区器进行分配的,它将主题分区分配给按消费者名称排序的消费者集合,这些名称在应用程序启动时随机设置。然而,当新的消费者加入消费者组时,新的消费者可以位于消费者集合的任何位置。这会创建一个条件,在这种情况下,预先存在的消费者在新消费者之后定位。结果是,在重新平衡之前发送的请求的响应消息会丢失,因为预先存在的消费者被分配了不同的分区。
为了防止ClientKafka
消费者丢失响应消息,使用了Nest特定的内置自定义分区器。这个自定义分区器根据在应用程序启动时设置的高分辨率时间戳(process.hrtime()
)将分区分配给按时间排序的消费者集合。
消息响应订阅#
注意 此部分仅适用于您使用请求-响应消息样式(使用@MessagePattern
装饰器和ClientKafka#send
方法)的情况。订阅响应主题对于基于事件通信(使用@EventPattern
装饰器和ClientKafka#emit
方法)是不必要的。
ClientKafka
类提供了subscribeToResponseOf()
方法。subscribeToResponseOf()
方法接受一个请求的主题名称作为参数,并将派生的回复主题名称添加到回复主题的集合中。在实现消息模式时,需要使用此方法。
onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
}
如果ClientKafka
实例是异步创建的,则必须在调用connect()
方法之前调用subscribeToResponseOf()
方法。
async onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
await this.client.connect();
}
传入消息#
Nest将传入的Kafka消息作为一个具有key
、value
和headers
属性的对象进行接收,这些属性的值为Buffer
类型。然后,Nest通过将缓冲区转换为字符串来解析这些值。如果字符串类似于一个“对象”,Nest会尝试将该字符串解析为JSON
。然后,value
会传递给其关联的处理程序。
传出消息#
Nest在发布事件或发送消息时,在对参数进行序列化处理后发送出去的Kafka消息。这发生在传递给ClientKafka
的emit()
和send()
方法的参数上,或者从@MessagePattern
方法返回的值上。这种序列化会使用JSON.stringify()
或toString()
原型方法将非字符串或缓冲区的对象“字符串化”。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return items;
}
}
提示@Payload()
是从@nestjs/microservices
导入的。
通过传递一个具有key
和value
属性的对象,也可以对传出的消息进行分区键化(Keying)。对消息进行分区键化对于满足共同分区要求是很重要的。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest';
const heroId = message.heroId;
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return {
headers: {
realm
},
key: heroId,
value: items
}
}
}
此外,以这种格式传递的消息还可以包含在headers
散列属性中设置的自定义头。头散列属性值必须是string
类型或Buffer
类型。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest';
const heroId = message.heroId;
const dragonId = message.dragonId;
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
];
return {
headers: {
kafka_nestRealm: realm
},
key: heroId,
value: items
}
}
}
基于事件#
虽然请求-响应方法在服务之间交换消息时是理想的,但当您的消息风格是基于事件(而基于事件又是Kafka的理想应用场景)时,它可能不太适用——您只想发布事件而无需等待响应。在这种情况下,您不希望为维护两个主题所需的请求-响应开销。
查看这两个部分以了解更多信息:概述:基于事件 和 概述:发布事件。
上下文#
在更复杂的场景中,您可能需要访问有关传入请求的更多信息。当使用Kafka传输器时,您可以访问KafkaContext
对象。
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
console.log(`Topic: ${context.getTopic()}`);
}
@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
console.log(`Topic: ${context.getTopic()}`);
}
提示@Payload()
、@Ctx()
和KafkaContext
是从@nestjs/microservices
包中导入的。
要访问原始的Kafka IncomingMessage
对象,请使用KafkaContext
对象的getMessage()
方法,如下所示:
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}
@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}
其中IncomingMessage
满足以下接口:
interface IncomingMessage {
topic: string;
partition: number;
timestamp: string;
size: number;
attributes: number;
offset: string;
key: any;
value: any;
headers: Record<string, any>;
}
如果您的处理程序对于每条接收到的消息需要较长的处理时间,您应该考虑使用heartbeat
回调。要检索heartbeat
函数,请使用KafkaContext
的getHeartbeat()
方法,如下所示:
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const heartbeat = context.getHeartbeat();
// Do some slow processing
await doWorkPart1();
// Send heartbeat to not exceed the sessionTimeout
await heartbeat();
// Do some slow processing again
await doWorkPart2();
}
命名约定#
Kafka微服务组件在client.clientId
和consumer.groupId
选项后附加了它们各自角色的描述,以防止Nest微服务客户端和服务器组件之间的冲突。默认情况下,ClientKafka
组件将-client
附加到client.clientId
,而ServerKafka
组件将-server
附加到consumer.groupId
。请注意,下面提供的值是如何通过这种方式转换的(如注释中所示)。
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-server
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-server
},
}
});
对于客户端:
heroes.controller.ts
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-client
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-client
}
}
})
client: ClientKafka;
提示 Kafka客户端和消费者的命名约定可以通过扩展ClientKafka
和KafkaServer
来进行自定义,您可以在自己的自定义提供程序中覆盖构造函数。
由于Kafka微服务消息模式使用两个主题来进行请求和回复通道,因此应从请求主题派生回复模式。默认情况下,回复主题的名称是将请求主题名称与.reply
附加在一起组成的。
onModuleInit() {
this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}
提示 Kafka回复主题的命名约定可以通过扩展ClientKafka
来进行自定义,您可以在自己的自定义提供程序中覆盖getResponsePatternName
方法。
可重试异常#
与其他传输器类似,所有未处理的异常都会自动包装成RpcException
并转换为“用户友好”的格式。但是,在某些边缘情况下,您可能希望绕过此机制,让异常被kafkajs
驱动程序消费。在处理消息时抛出异常会指示kafkajs
重试它(重新传递它),这意味着即使消息(或事件)处理程序被触发,偏移量也不会提交到Kafka。
警告 对于事件处理程序(基于事件的通信),默认情况下,所有未处理的异常都被视为可重试的异常。
为此,您可以使用一个名为KafkaRetriableException
的专用类,如下所示:
throw new KafkaRetriableException('...');
提示KafkaRetriableException
类是从@nestjs/microservices
包中导出的。
提交偏移量#
在使用Kafka时,提交偏移量是至关重要的。默认情况下,消息将在特定时间后自动提交。要获取更多信息,请访问KafkaJS文档。ClientKafka
提供了一种手动提交偏移量的方式,其工作方式类似于原生KafkaJS实现。
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
// business logic
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await this.client.commitOffsets([{ topic, partition, offset }])
}
@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
// business logic
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await this.client.commitOffsets([{ topic, partition, offset }])
}
要禁用消息的自动提交,在run
配置中设置autoCommit: false
,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
autoCommit: false
}
}
});
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
autoCommit: false
}
}
});