2020年9月18日星期五

.NET Core 下使用 Kafka

安装

CentOS 安装 kafka

  • Kafka : : https://zookeeper.apache.org/releases.html

下载并解压

# 下载,并解压$ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz$ tar -zxvf kafka_2.12-2.1.1.tgz$ mv kafka_2.12-2.1.1.tgz /data/kafka# 下载 zookeeper,解压$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz$ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz$ mv apache-zookeeper-3.5.8-bin /data/zookeeper

启动 ZooKeeper

# 复制配置模版$ cd /data/kafka/conf$ cp zoo_sample.cfg zoo.cfg# 看看配置需不需要改$ vim zoo.cfg# 命令$ ./bin/zkServer.sh start # 启动$ ./bin/zkServer.sh status # 状态$ ./bin/zkServer.sh stop  # 停止$ ./bin/zkServer.sh restart # 重启# 使用客户端测试$ ./bin/zkCli.sh -server localhost:2181$ quit

启动 Kafka

# 备份配置$ cd /data/kafka$ cp config/server.properties config/server.properties_copy# 修改配置$ vim /data/kafka/config/server.properties# 集群配置下,每个 broker 的 id 是必须不同的# broker.id=0# 监听地址设置(内网)# listeners=PLAINTEXT://ip:9092# 对外提供服务的IP、端口# advertised.listeners=PLAINTEXT://106.75.84.97:9092# 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3# num.partitions=3# zookeeper 配置# zookeeper.connect=localhost:2181# 通过配置启动 kafka$ ./bin/kafka-server-start.sh config/server.properties&# 状态查看$ ps -ef|grep kafka$ jps

docker下安装Kafka

docker pull wurstmeister/zookeeperdocker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
docker pull wurstmeister/kafkadocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

介绍

  • Broker:消息中间件处理节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

kafka partition 和 consumer 数目关系

  • 如果consumer比partition多是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。
  • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。
  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

快速开始

在 .NET Core 项目中安装组件

Install-Package Confluent.Kafka

开源地址:https://github.com/confluentinc/confluent-kafka-dotnet

添加IKafkaService服务接口

public interface IKafkaService{ /// <summary> /// 发送消息至指定主题 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topicName"></param> /// <param name="message"></param> /// <returns></returns> Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class; /// <summary> /// 从指定主题订阅消息 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topics"></param> /// <param name="messageFunc"></param> /// <param name="cancellationToken"></param> /// <returns></returns> Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;}

实现IKafkaService

public class KafkaService : IKafkaService{ public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class {  var config = new ProducerConfig  {   BootstrapServers = "127.0.0.1:9092"  };  using var producer = new ProducerBuilder<string, string>(config).Build();  await producer.ProduceAsync(topicName, new Message<string, string>  {   Key = Guid.NewGuid().ToString(),   Value = message.SerializeToJson()  }); } public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class {  var config = new ConsumerConfig  {   BootstrapServers = "127.0.0.1:9092",   GroupId = "crow-consumer",   EnableAutoCommit = false,   StatisticsIntervalMs = 5000,   SessionTimeoutMs = 6000,   AutoOffsetReset = AutoOffsetReset.Earliest,   EnablePartitionEof = true  };  //const int commitPeriod = 5;  using var consumer = new ConsumerBuilder<Ignore, string>(config)        .SetErrorHandler((_, e) =>        {         Console.WriteLine($"Error: {e.Reason}");        })        .SetStatisticsHandler((_, json) =>        {         Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");        })        .SetPartitionsAssignedHandler((c, partitions) =>        {         string partitionsStr = string.Join(", ", partitions);         Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");        })        .SetPartitionsRevokedHandler((c, partitions) =>        {         string partitionsStr = string.Join(", ", partitions);         Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");        })        .Build();  consumer.Subscribe(topics);  try  {   while (true)   {    try    {     var consumeResult = consumer.Consume(cancellationToken);     Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");     if (consumeResult.IsPartitionEOF)     {      Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");      continue;     }     TMessage messageResult = null;     try     {      messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);     }     catch (Exception ex)     {      var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";      Console.WriteLine(errorMessage);      messageResult = null;     }     if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)     {      messageFunc(messageResult);      try      {       consumer.Commit(consumeResult);      }      catch (KafkaException e)      {       Console.WriteLine(e.Message);      }     }    }    catch (ConsumeException e)    {     Console.WriteLine($"Consume error: {e.Error.Reason}");    }   }  }  catch (OperationCanceledException)  {   Console.WriteLine("Closing consumer.");   consumer.Close();  }  await Task.CompletedTask; }}

注入IKafkaService,在需要使用的地方直接调用即可。

public class MessageService : IMessageService, ITransientDependency{ private readonly IKafkaService _kafkaService; public MessageService(IKafkaService kafkaService) {  _kafkaService = kafkaService; } public async Task RequestTraceAdded(XxxEventData eventData) {  await _kafkaService.PublishAsync(eventData.TopicName, eventData); }}

以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。

var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) =>{ e.Cancel = true; cts.Cancel();};await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>{ // Your logic Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");}, cts.Token);

IKafkaService中已经写了订阅消息的接口,这里也是注入后直接使用即可。

生产者消费者示例

生产者

static async Task Main(string[] args){ if (args.Length != 2) {  Console.WriteLine("Usage: .. brokerList topicName");  // 127.0.0.1:9092 helloTopic  return; } var brokerList = args.First(); var topicName = args.Last(); var config = new ProducerConfig { BootstrapServers = brokerList }; using var producer = new ProducerBuilder<string, string>(config).Build(); Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); Console.WriteLine("-----------------------------------------------------------------------"); Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:"); Console.WriteLine("> key value<Enter>"); Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:"); Console.WriteLine("> value<enter>"); Console.WriteLine("Ctrl-C to quit.\n"); var cancelled = false; Console.CancelKeyPress += (_, e) => {  e.Cancel = true;  cancelled = true; }; while (!cancelled) {  Console.Write("> ");  var text = string.Empty;  try  {   text = Console.ReadLine();  }  catch (IOException)  {   break;  }  if (string.IsNullOrWhiteSpace(text))  {   break;  }  var key = string.Empty;  var val = text;  var index = text.IndexOf(" ");  if (index != -1)  {   key = text.Substring(0, index);   val = text.Substring(index + 1);  }  try  {   var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>   {    Key = key,    Value = val   });   Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");  }  catch (ProduceException<string, string> e)  {   Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");  } }}

消费者

static void Main(string[] args){ if (args.Length != 2) {  Console.WriteLine("Usage: .. brokerList topicName");  // 127.0.0.1:9092 helloTopic  return; } var brokerList = args.First(); var topicName = args.Last(); Console.WriteLine($"Started consumer, Ctrl-C to stop consuming"); var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => {  e.Cancel = true;  cts.Cancel(); }; var config = new ConsumerConfig {  BootstrapServers = brokerList,  GroupId = "consumer",  EnableAutoCommit = false,  StatisticsIntervalMs = 5000,  SessionTimeoutMs = 6000,  AutoOffsetReset = AutoOffsetReset.Earliest,  EnablePartitionEof = true }; const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config)       .SetErrorHandler((_, e) =>       {        Console.WriteLine($"Error: {e.Reason}");       })       .SetStatisticsHandler((_, json) =>       {        Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");        //Console.WriteLine($"Statistics: {json}");       })       .SetPartitionsAssignedHandler((c, partitions) =>       {        Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");       })       .SetPartitionsRevokedHandler((c, partitions) =>       {        Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");       })       .Build(); consumer.Subscribe(topicName); try {  while (true)  {   try   {    var consumeResult = consumer.Consume(cts.Token);    if (consumeResult.IsPartitionEOF)    {     Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");     continue;    }    Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");    if (consumeResult.Offset % commitPeriod == 0)    {     try     {      consumer.Commit(consumeResult);     }     catch (KafkaException e)     {      Console.WriteLine($"Commit error: {e.Error.Reason}");     }    }   }   catch (ConsumeException e)   {    Console.WriteLine($"Consume error: {e.Error.Reason}");   }  } } catch (OperationCanceledException) {  Console.WriteLine("Closing consumer.");  consumer.Close(); }}

.NET Core 下使用 Kafka
isbn爆款打造海带宝开开眼!这4个亚马逊联盟网站必须安排数据分析- 数据透视表 (结合CPC广告搜索词报告分析)亚马逊"退出中国"的真相与原因亚马逊低价政策暗潮涌动,卖家还有利可图吗?[lisiting干货]亚马逊如何批量上传?如何批量管理listing?亚马逊t恤ebay客服技巧与实战建议eori疫情过后亚马逊卖家如何恢复业务,为primeday做准备?海外网红KOL营销_TikTok广告投放亚马逊新手易犯的5个选品误区,你中招了吗?新卖家收藏!亚马逊常用的分类编码及解析!单手就能打爆新品的亚马逊运营,都是如何做到的?75%的邮费补贴,Wish卖家千万别错过!

没有评论:

发表评论