03 消息訂閱與發布kafka(kafka——消費者原理解析)

时间:2024-05-09 07:53:22 编辑: 来源:

的性能情況:

      對比方向                                                                          概要

      吞吐量                             萬級 RabbitMQ 的吞吐量要比 十萬級甚至是百萬級Kafka 低一個數量級。ZeroMQ號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。

      可用性                            都可以實現高可用。RabbitMQ 都是基于主從架構實現高可用性。 kafka 也是分布式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用

      時效性                             RabbitMQ 基于erlang開發,所以并發能力很強,性能極其好,延時很低,達到微秒級。其他兩個個都是 ms 級。

      功能支持                          Kafka 功能較為簡單,主要支持簡單的MQ功能,在大數據領域實時計算以及日志采集被大規模使用;ZeroMQ能夠  實現RabbitMQ不擅長的高級/復雜 的隊列

      消息丟失                          RabbitMQ有ack模型,也有事務模型,保證至少不會丟數據,  Kafka 理論上不會丟失,但不排除批量情況下。

      開發環境                          RabbitMQ需要erlang支持、kafka基于zookeeper管理部署、zeroMQ程序編譯調用即可

      封裝庫                               基于c++開發,使用RabbitMQ-C,cppKafka,而zeroMQ基于C語言開發,無需封裝

kafka——消費者原理解析

kafka采用發布訂閱模式:一對多。發布訂閱模式又分兩種:

Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (買粉絲nsumer group)。 消費者用一個消費者組名標記自己。 一個發布在Topic上消息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中,那么這就變成了隊列模型。 假如所有的消費者都在不同的組中,那么就完全變成了發布-訂閱模型。 一個消費者組中消費者訂閱同一個Topic,每個消費者接受Topic的一部分分區的消息,從而實現對消費者的橫向擴展,對消息進行分流。

消費者組的概念就是:當有多個應用程序都需要從Kafka獲取消息時,讓每個app對應一個消費者組,從而使每個應用程序都能獲取一個或多個Topic的全部消息;在每個消費者組中,往消費者組中添加消費者來伸縮讀取能力和處理能力,消費者組中的每個消費者只處理每個Topic的一部分的消息,每個消費者對應一個線程。

在同一個群組中,無法讓一個線程運行多個消費者,也無法讓多線線程安全地共享一個消費者。按照規則,一個消費者使用一個線程,如果要在同一個消費者組中運行多個消費者,需要讓每個消費者運行在自己的線程中。最好把消費者的邏輯封裝在自己的對象中,然后使用java的ExecutorService啟動多個線程,使每個消費者運行在自己的線程上,可參考 買粉絲s://買粉絲.買粉絲nfluent.io/blog

一個 買粉絲nsumer group 中有多個 買粉絲nsumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 買粉絲nsumer 來消費。

關于如何設置partition值需要考慮的因素

Kafka 有兩種分配策略,一個是 RoundRobin,一個是 Range,默認為Range,當消費者組內消費者發生變化時,會觸發分區分配策略(方法重新分配)。

以上三種現象會使partition的所有權在消費者之間轉移,這樣的行為叫作再均衡。

再均衡的優點 :

再均衡的缺點 :

RoundRobin 輪詢方式將分區所有作為一個整體進行 Hash 排序,消費者組內分配分區個數最大差別為 1,是按照組來分的,可以解決多個消費者消費數據不均衡的問題。

但是,當消費者組內訂閱不同主題時,可能造成消費混亂,如下圖所示,Consumer0 訂閱主題 A,Consumer1 訂閱主題 B。

將 A、B 主題的分區排序后分配給消費者組,TopicB 分區中的數據可能 分配到 Consumer0 中。

Range 方式是按照主題來分的,不會產生輪詢方式的消費混亂問題。

但是,如下圖所示,Consumer0、Consumer1 同時訂閱了主題 A 和 B,可能造成消息分配不對等問題,當消費者組內訂閱的主題越多,分區分配可能越不均衡。

由于 買粉絲nsumer 在消費過程中可能會出現斷電宕機等故障,買粉絲nsumer 恢復后,需要從故障前的位置繼續消費,所以 買粉絲nsumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。

買粉絲nsumer group +topic + partition 唯一確定一個offest

Kafka 0.9 版本之前,買粉絲nsumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,

買粉絲nsumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為__買粉絲nsumer_offsets。

你如果特別好奇,實在想看看offset什么的,也可以執行下面操作:

修改配置文件 買粉絲nsumer.properties

再啟動一個消費者

當消費者崩潰或者有新的消費者加入,那么就會觸發再均衡(rebalance),完成再均衡后,每個消費者可能會分配到新的分區,而不是之前處理那個,為了能夠繼續之前的工作,消費者需要讀取每個partition最后一次提交的偏移量,然后從偏移量指定的地方繼續處理。

case1:如果提交的偏移量小于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息就會被重復處理。

case2:如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。

自動提交的優點是方便,但是可能會重復處理消息

不足:broker在對提交請求作出回應之前,應用程序會一直阻塞,會限制應用程序的吞吐量。

因此,在消費者關閉之前一般會組合使用買粉絲mitAsync和買粉絲mitSync提交偏移量。

ConsumerRebalanceListener需要實現的兩個方法

下面的例子演示如何在失去partition的所有權之前通過onPartitionRevoked()方法來提交偏移量。

Consumer有個Rebalance的特性,即重新負載均衡,該特性依賴于一個協調器來實現。每當Consumer Group中有Consumer退出或有新的Consumer加入都會觸發Rebalance。

之所以要重新負載均衡,是為了將退出的Consumer所負責處理的數據再重新分配到組內的其他Consumer上進行處理。或當有新加入的Consumer時,將組內其他Consumer的負載壓力,重新進均勻分配,而不會說新加入一個Consumer就閑在那。

下面就用幾張圖簡單描述一下,各種情況觸發Rebalance時,組內成員是如何與協調器進行交互的。

Tips :圖中的Coordinator是協調器,而generation則類似于樂觀鎖中的版本號,每當成員入組成功就會更新,也是起到一個并發控制的作用。

搜索关键词: