电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Kafka Java Producer代碼實(shí)例詳解

瀏覽:90日期:2022-08-31 15:45:44

根據(jù)業(yè)務(wù)需要可以使用Kafka提供的Java Producer API進(jìn)行產(chǎn)生數(shù)據(jù),并將產(chǎn)生的數(shù)據(jù)發(fā)送到Kafka對(duì)應(yīng)Topic的對(duì)應(yīng)分區(qū)中,入口類為:Producer

Kafka的Producer API主要提供下列三個(gè)方法:

public void send(KeyedMessage<K,V> message) 發(fā)送單條數(shù)據(jù)到Kafka集群 public void send(List<KeyedMessage<K,V>> messages) 發(fā)送多條數(shù)據(jù)(數(shù)據(jù)集)到Kafka集群 public void close() 關(guān)閉Kafka連接資源

一、JavaKafkaProducerPartitioner:自定義的數(shù)據(jù)分區(qū)器,功能是:決定輸入的key/value鍵值對(duì)的message發(fā)送到Topic的那個(gè)分區(qū)中,返回分區(qū)id,范圍:[0,分區(qū)數(shù)量); 這里的實(shí)現(xiàn)比較簡(jiǎn)單,根據(jù)key中的數(shù)字決定分區(qū)的值。具體代碼如下:

import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;/** * Created by gerry on 12/21. */public class JavaKafkaProducerPartitioner implements Partitioner { /** * 無(wú)參構(gòu)造函數(shù) */ public JavaKafkaProducerPartitioner() { this(new VerifiableProperties()); } /** * 構(gòu)造函數(shù),必須給定 * * @param properties 上下文 */ public JavaKafkaProducerPartitioner(VerifiableProperties properties) { // nothings } @Override public int partition(Object key, int numPartitions) { int num = Integer.valueOf(((String) key).replaceAll('key_', '').trim()); return num % numPartitions; }}

二、 JavaKafkaProducer:通過(guò)Kafka提供的API進(jìn)行數(shù)據(jù)產(chǎn)生操作的測(cè)試類;具體代碼如下:

import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.apache.log4j.Logger;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.ThreadLocalRandom;/** * Created by gerry on 12/21. */public class JavaKafkaProducer { private Logger logger = Logger.getLogger(JavaKafkaProducer.class); public static final String TOPIC_NAME = 'test'; public static final char[] charts = 'qazwsxedcrfvtgbyhnujmikolp1234567890'.toCharArray(); public static final int chartsLength = charts.length; public static void main(String[] args) { String brokerList = '192.168.187.149:9092'; brokerList = '192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095'; brokerList = '192.168.187.146:9092'; Properties props = new Properties(); props.put('metadata.broker.list', brokerList); /** * 0表示不等待結(jié)果返回<br/> * 1表示等待至少有一個(gè)服務(wù)器返回?cái)?shù)據(jù)接收標(biāo)識(shí)<br/> * -1表示必須接收到所有的服務(wù)器返回標(biāo)識(shí),及同步寫(xiě)入<br/> * */ props.put('request.required.acks', '0'); /** * 內(nèi)部發(fā)送數(shù)據(jù)是異步還是同步 * sync:同步, 默認(rèn) * async:異步 */ props.put('producer.type', 'async'); /** * 設(shè)置序列化的類 * 可選:kafka.serializer.StringEncoder * 默認(rèn):kafka.serializer.DefaultEncoder */ props.put('serializer.class', 'kafka.serializer.StringEncoder'); /** * 設(shè)置分區(qū)類 * 根據(jù)key進(jìn)行數(shù)據(jù)分區(qū) * 默認(rèn)是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進(jìn)行分區(qū) * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉(zhuǎn)換為字節(jié)數(shù)組后進(jìn)行hash分區(qū) */ props.put('partitioner.class', 'JavaKafkaProducerPartitioner'); // 重試次數(shù) props.put('message.send.max.retries', '3'); // 異步提交的時(shí)候(async),并發(fā)提交的記錄數(shù) props.put('batch.num.messages', '200'); // 設(shè)置緩沖區(qū)大小,默認(rèn)10KB props.put('send.buffer.bytes', '102400'); // 2. 構(gòu)建Kafka Producer Configuration上下文 ProducerConfig config = new ProducerConfig(props); // 3. 構(gòu)建Producer對(duì)象 final Producer<String, String> producer = new Producer<String, String>(config); // 4. 發(fā)送數(shù)據(jù)到服務(wù)器,并發(fā)線程發(fā)送 final AtomicBoolean flag = new AtomicBoolean(true); int numThreads = 50; ExecutorService pool = Executors.newFixedThreadPool(numThreads); for (int i = 0; i < 5; i++) { pool.submit(new Thread(new Runnable() {@Overridepublic void run() { while (flag.get()) { // 發(fā)送數(shù)據(jù) KeyedMessage message = generateKeyedMessage(); producer.send(message); System.out.println('發(fā)送數(shù)據(jù):' + message); // 休眠一下 try { int least = 10; int bound = 100; Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound)); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ' shutdown....');} }, 'Thread-' + i)); } // 5. 等待執(zhí)行完成 long sleepMillis = 600000; try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } flag.set(false); // 6. 關(guān)閉資源 pool.shutdown(); try { pool.awaitTermination(6, TimeUnit.SECONDS); } catch (InterruptedException e) { } finally { producer.close(); // 最后之后調(diào)用 } } /** * 產(chǎn)生一個(gè)消息 * * @return */ private static KeyedMessage<String, String> generateKeyedMessage() { String key = 'key_' + ThreadLocalRandom.current().nextInt(10, 99); StringBuilder sb = new StringBuilder(); int num = ThreadLocalRandom.current().nextInt(1, 5); for (int i = 0; i < num; i++) { sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(' '); } String message = sb.toString().trim(); return new KeyedMessage(TOPIC_NAME, key, message); } /** * 產(chǎn)生一個(gè)給定長(zhǎng)度的字符串 * * @param numItems * @return */ private static String generateStringMessage(int numItems) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < numItems; i++) { sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]); } return sb.toString(); }}

三、Pom.xml依賴配置如下

<properties> <kafka.version>0.8.2.1</kafka.version></properties><dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> </dependency></dependencies>

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 复合土工膜厂家|hdpe防渗土工膜|复合防渗土工布|玻璃纤维|双向塑料土工格栅-安徽路建新材料有限公司 | 泰兴市热钻机械有限公司-热熔钻孔机-数控热熔钻-热熔钻孔攻牙一体机 | 布袋除尘器-单机除尘器-脉冲除尘器-泊头市兴天环保设备有限公司 布袋除尘器|除尘器设备|除尘布袋|除尘设备_诺和环保设备 | 不锈钢电动球阀_气动高压闸阀_旋塞疏水调节阀_全立阀门-来自温州工业阀门巨头企业 | 电磁流量计厂家_涡街流量计厂家_热式气体流量计-青天伟业仪器仪表有限公司 | 苏州柯瑞德货架-仓库自动化改造解决方案 | 酶联免疫分析仪-多管旋涡混合仪|混合器-莱普特科学仪器(北京)有限公司 | 带锯机|木工带锯机圆木推台锯|跑车带锯机|河北茂业机械制造有限公司| | 周易算网-八字测算网 - 周易算网-宝宝起名取名测名字周易八字测算网 | 天津货架厂_穿梭车货架_重型仓储货架_阁楼货架定制-天津钢力仓储货架生产厂家_天津钢力智能仓储装备 | 英国公司注册-新加坡公司注册-香港公司开户-离岸公司账户-杭州商标注册-杭州优创企业 | 水成膜泡沫灭火剂_氟蛋白泡沫液_河南新乡骏华消防科技厂家 | 沈阳缠绕膜价格_沈阳拉伸膜厂家_沈阳缠绕膜厂家直销 | 数显恒温培养摇床-卧式/台式恒温培养摇床|朗越仪器 | uv固化机-丝印uv机-工业烤箱-五金蚀刻机-分拣输送机 - 保定市丰辉机械设备制造有限公司 | MES系统工业智能终端_生产管理看板/安灯/ESOP/静电监控_讯鹏科技 | 国际船舶网 - 船厂、船舶、造船、船舶设备、航运及海洋工程等相关行业综合信息平台 | 煤机配件厂家_刮板机配件_链轮轴组_河南双志机械设备有限公司 | 齿式联轴器-弹性联轴器-联轴器厂家-江苏诺兴传动联轴器制造有限公司 | 恒温恒湿箱(药品/保健品/食品/半导体/细菌)-兰贝石(北京)科技有限公司 | 户外-组合-幼儿园-不锈钢-儿童-滑滑梯-床-玩具-淘气堡-厂家-价格 | 电镀电源整流器_高频电解电源_单脉双脉冲电源 - 东阳市旭东电子科技 | 高速龙门架厂家_监控杆_多功能灯杆_信号灯杆_锂电池太阳能路灯-鑫世源照明 | 土壤有机碳消解器-石油|表层油类分析采水器-青岛溯源环保设备有限公司 | 山东led显示屏,山东led全彩显示屏,山东LED小间距屏,临沂全彩电子屏-山东亚泰视讯传媒有限公司 | 橡胶粉碎机_橡胶磨粉机_轮胎粉碎机_轮胎磨粉机-河南鼎聚重工机械制造有限公司 | 超声波乳化机-超声波分散机|仪-超声波萃取仪-超声波均质机-精浩机械|首页 | 100_150_200_250_300_350_400公斤压力空气压缩机-舰艇航天配套厂家 | 耐磨焊丝,堆焊焊丝,耐磨药芯焊丝,碳化钨焊丝-北京耐默公司 | 压力控制器,差压控制器,温度控制器,防爆压力控制器,防爆温度控制器,防爆差压控制器-常州天利智能控制股份有限公司 | 智成电子深圳tdk一级代理-提供TDK电容电感贴片蜂鸣器磁芯lambda电源代理经销,TDK代理商有哪些TDK一级代理商排名查询。-深圳tdk一级代理 | SDI车窗夹力测试仪-KEMKRAFT方向盘测试仪-上海爱泽工业设备有限公司 | 神超官网_焊接圆锯片_高速钢锯片_硬质合金锯片_浙江神超锯业制造有限公司 | 厌氧反应器,IC厌氧反应器,厌氧三相分离器-山东创博环保科技有限公司 | 精密交叉滚子轴承厂家,转盘轴承,YRT转台轴承-洛阳千协轴承 | 电动垃圾车,垃圾清运车-江苏速利达机车有限公司 | 车载加油机品牌_ 柴油加油机厂家 | 水成膜泡沫灭火剂_氟蛋白泡沫液_河南新乡骏华消防科技厂家 | 河南砖机首页-全自动液压免烧砖机,小型砌块水泥砖机厂家[十年老厂] | 山东商品混凝土搅拌楼-环保型搅拌站-拌合站-分体仓-搅拌机厂家-天宇 | 聚氨酯保温钢管_聚氨酯直埋保温管道_聚氨酯发泡保温管厂家-沧州万荣防腐保温管道有限公司 |