电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術(shù)文章
文章詳情頁

Java kafka如何實現(xiàn)自定義分區(qū)類和攔截器

瀏覽:46日期:2022-08-31 13:14:07

生產(chǎn)者發(fā)送到對應(yīng)的分區(qū)有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應(yīng)的java api, 有多種參數(shù))

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區(qū)算法的功能,由業(yè)務(wù)手動實現(xiàn)分布:

1、實現(xiàn)一個自定義分區(qū)類,CustomPartitioner實現(xiàn)Partitioner

import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { /** * * @param topic 當(dāng)前的發(fā)送的topic * @param key 當(dāng)前的key值 * @param keyBytes 當(dāng)前的key的字節(jié)數(shù)組 * @param value 當(dāng)前的value值 * @param valueBytes 當(dāng)前的value的字節(jié)數(shù)組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據(jù)返回值就是分區(qū)號, 這邊就是固定發(fā)送到三號分區(qū) return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}

2、producer配置文件指定,具體的分區(qū)類

// 具體的分區(qū)類props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner');

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現(xiàn)消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;import java.util.UUID;public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println('這是MessageInterceptor的configure方法'); } /** * 這個是消息發(fā)送之前進行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創(chuàng)建一個新的record,把uuid入消息體的最前部 System.out.println('為消息添加uuid'); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),UUID.randomUUID().toString().replace('-', '') + ',' + record.value()); } /** * 這個是生產(chǎn)者回調(diào)函數(shù)調(diào)用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println('MessageInterceptor攔截器的onAcknowledgement方法'); } @Override public void close() { System.out.println('MessageInterceptor close 方法'); }}

2、定義計數(shù)攔截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println('這是CounterInterceptor的configure方法'); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println('CounterInterceptor計數(shù)過濾器不對消息做任何操作'); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統(tǒng)計成功和失敗的次數(shù) System.out.println('CounterInterceptor過濾器執(zhí)行統(tǒng)計失敗和成功數(shù)量'); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結(jié)果 System.out.println('Successful sent: ' + successCounter); System.out.println('Failed sent: ' + errorCounter); }}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務(wù)端的主機名和端口號 props.put('bootstrap.servers', 'localhost:9092'); // 等待所有副本節(jié)點的應(yīng)答 props.put('acks', 'all'); // 消息發(fā)送最大嘗試次數(shù) props.put('retries', 0); // 一批消息處理大小 props.put('batch.size', 16384); // 請求延時,可能生產(chǎn)數(shù)據(jù)太快了 props.put('linger.ms', 1); // 發(fā)送緩存區(qū)內(nèi)存大小,數(shù)據(jù)是先放到生產(chǎn)者的緩沖區(qū) props.put('buffer.memory', 33554432); // key序列化 props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // value序列化 props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // 具體的分區(qū)類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner'); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add('kafka.MessageInterceptor'); interceptors.add('kafka.CounterInterceptor'); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>('test_0515', i + '', 'xxx-' + i), new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println('這是producer回調(diào)函數(shù)');} }); } /*System.out.println('現(xiàn)在執(zhí)行關(guān)閉producer'); producer.close();*/ producer.close(); }}

總結(jié),我們可以知道攔截器鏈各個方法的執(zhí)行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執(zhí)行A的configure方法,執(zhí)行B的configure方法

(2)執(zhí)行A的onSend方法,B的onSend方法

(3)生產(chǎn)者發(fā)送完畢后,執(zhí)行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執(zhí)行producer自身的callback回調(diào)函數(shù)。

(5)執(zhí)行A的close方法,B的close方法。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 河南橡胶接头厂家,河南波纹补偿器厂家,河南可曲挠橡胶软连接,河南套筒补偿器厂家-河南正大阀门 | 数字展示在线_数字展示行业门户网站 | 新疆乌鲁木齐网站建设-乌鲁木齐网站制作设计-新疆远璨网络 | 可程式恒温恒湿试验箱|恒温恒湿箱|恒温恒湿试验箱|恒温恒湿老化试验箱|高低温试验箱价格报价-广东德瑞检测设备有限公司 | POS机办理_个人pos机免费领取-银联pos机申请首页 | 数控专用机床,专用机床,自动线,组合机床,动力头,自动化加工生产线,江苏海鑫机床有限公司 | 新材料分散-高速均质搅拌机-超声波分散混合-上海化烁智能设备有限公司 | 超声波清洗机-超声波清洗设备定制生产厂家 - 深圳市冠博科技实业有限公司 | 旋振筛_不锈钢旋振筛_气旋筛_旋振筛厂家—新乡市大汉振动机械有限公司 | 塑钢课桌椅、学生课桌椅、课桌椅厂家-学仕教育设备首页 | 传递窗_超净|洁净工作台_高效过滤器-传递窗厂家广州梓净公司 | 成都租车_成都租车公司_成都租车网_众行宝 | 耐火浇注料-喷涂料-浇注料生产厂家_郑州市元领耐火材料有限公司 耐力板-PC阳光板-PC板-PC耐力板 - 嘉兴赢创实业有限公司 | 浙江寺庙设计-杭州寺院设计-宁波寺庙规划_汉匠| 低压载波电能表-单相导轨式电能表-华邦电力科技股份有限公司-智能物联网综合管理平台 | 知名电动蝶阀,电动球阀,气动蝶阀,气动球阀生产厂家|价格透明-【固菲阀门官网】 | 超声波流量计_流量标准装置生产厂家 _河南盛天精密测控 | 河北凯普威医疗器材有限公司,高档轮椅系列,推车系列,座厕椅系列,协步椅系列,拐扙系列,卫浴系列 | Maneurop/美优乐压缩机,活塞压缩机,型号规格,技术参数,尺寸图片,价格经销商 | 一体化预制泵站-一体化提升泵站-一体化泵站厂家-山东康威环保 | 安平县鑫川金属丝网制品有限公司,声屏障,高速声屏障,百叶孔声屏障,大弧形声屏障,凹凸穿孔声屏障,铁路声屏障,顶部弧形声屏障,玻璃钢吸音板 | 喷涂流水线,涂装流水线,喷漆流水线-山东天意设备科技有限公司 | 雄松华章(广州华章MBA)官网-专注MBA/MPA/MPAcc/MEM辅导培训 | ?水马注水围挡_塑料注水围挡_防撞桶-常州瑞轩水马注水围挡有限公司 | 主题班会网 - 安全教育主题班会,各类主题班会PPT模板 | 宝元数控系统|对刀仪厂家|东莞机器人控制系统|东莞安川伺服-【鑫天驰智能科技】 | 电动卫生级调节阀,电动防爆球阀,电动软密封蝶阀,气动高压球阀,气动对夹蝶阀,气动V型调节球阀-上海川沪阀门有限公司 | 拼装地板,悬浮地板厂家,悬浮式拼装运动地板-石家庄博超地板科技有限公司 | 圆形振动筛_圆筛_旋振筛_三次元振动筛-河南新乡德诚生产厂家 | 全国冰箱|空调|洗衣机|热水器|燃气灶维修服务平台-百修家电 | 钢制拖链生产厂家-全封闭钢制拖链-能源钢铝拖链-工程塑料拖链-河北汉洋机械制造有限公司 | 钢格栅板_钢格板网_格栅板-做专业的热镀锌钢格栅板厂家-安平县迎瑞丝网制造有限公司 | 塑钢课桌椅、学生课桌椅、课桌椅厂家-学仕教育设备首页 | 消防泵-XBD单级卧式/立式消防泵-上海塑泉泵阀(集团)有限公司 | 智能化的检漏仪_气密性测试仪_流量测试仪_流阻阻力测试仪_呼吸管快速检漏仪_连接器防水测试仪_车载镜头测试仪_奥图自动化科技 | 电缆接头_防水接头_电缆防水接头 - 乐清市新豪电气有限公司 | 编织人生 - 权威手工编织网站,编织爱好者学习毛衣编织的门户网站,织毛衣就上编织人生网-编织人生 | 「钾冰晶石」氟铝酸钾_冰晶石_氟铝酸钠「价格用途」-亚铝氟化物厂家 | 防渗土工膜|污水处理防渗膜|垃圾填埋场防渗膜-泰安佳路通工程材料有限公司 | _网名词典_网名大全_qq网名_情侣网名_个性网名 | 磨煤机配件-高铬辊套-高铬衬板-立磨辊套-盐山县宏润电力设备有限公司 |