电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

Python如何把Spark數據寫入ElasticSearch

瀏覽:8日期:2022-07-29 14:37:53

這里以將Apache的日志寫入到ElasticSearch為例,來演示一下如何使用Python將Spark數據導入到ES中。

實際工作中,由于數據與使用框架或技術的復雜性,數據的寫入變得比較復雜,在這里我們簡單演示一下。

如果使用Scala或Java的話,Spark提供自帶了支持寫入ES的支持庫,但Python不支持。所以首先你需要去這里下載依賴的ES官方開發的依賴包包。

下載完成后,放在本地目錄,以下面命令方式啟動pyspark:

pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,請設置環境變量:

export PYSPARK_PYTHON=/usr/bin/python3理解如何寫入ES的關鍵是要明白,ES是一個JSON格式的數據庫,它有一個必須的要求。數據格式必須采用以下格式

{ 'id: { the rest of your json}}

往下會展示如何轉換成這種格式。

解析Apache日志文件我們將Apache的日志文件讀入,構建Spark RDD。然后我們寫一個parse()函數用正則表達式處理每條日志,提取我們需要的字

rdd = sc.textFile('/home/ubuntu/walker/apache_logs')regex=’^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] '(S+)s?(S+)?s?(S+)?' (d{3}|-) (d+|-)s?'?([^']*)'?s?'?([^']*)?'?$’

p=re.compile(regex)def parse(str): s=p.match(str) d = {} d[’ip’]=s.group(1) d[’date’]=s.group(4) d[’operation’]=s.group(5) d[’uri’]=s.group(6) return d

換句話說,我們剛開始從日志文件讀入RDD的數據類似如下:

[’83.149.9.216 - - [17/May/2015:10:05:03 +0000] 'GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1' 200 203023 'http://semicomplete.com/presentations/logstash-monitorama-2013/' 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36'’]

然后我們使用map函數轉換每條記錄:

rdd2 = rdd.map(parse)

rdd2.take(1)

[{’date’: ’17/May/2015:10:05:03 +0000’, ’ip’: ’83.149.9.216’, ’operation’: ’GET’, ’uri’: ’/presentations/logstash-monitorama-2013/images/kibana-search.png’}]

現在看起來像JSON,但并不是JSON字符串,我們需要使用json.dumps將dict對象轉換。

我們同時增加一個doc_id字段作為整個JSON的ID。在配置ES中我們增加如下配置“es.mapping.id”: “doc_id”告訴ES我們將這個字段作為ID。

這里我們使用SHA算法,將這個JSON字符串作為參數,得到一個唯一ID。計算結果類似如下,可以看到ID是一個很長的SHA數值。

rdd3.take(1)

[(’a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c’, ’{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'doc_id': 'a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}’)]

現在我們需要制定ES配置,比較重要的兩項是:

“es.resource” : ‘walker/apache’: 'walker'是索引,apache是類型,兩者一般合稱索引 “es.mapping.id”: “doc_id”: 告訴ES那個字段作為整個文檔的ID,也就是查詢結果中的_id

其他的配置自己去探索。

然后我們使用saveAsNewAPIHadoopFile()將RDD寫入到ES。這部分代碼對于所有的ES都是一樣的,比較固定,不需要理解每一個細節

es_write_conf = { 'es.nodes' : 'localhost', 'es.port' : '9200', 'es.resource' : ’walker/apache’, 'es.input.json': 'yes', 'es.mapping.id': 'doc_id' } rdd3.saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf)rdd3 = rdd2.map(addID)def addId(data): j=json.dumps(data).encode(’ascii’, ’ignore’) data[’doc_id’] = hashlib.sha224(j).hexdigest() return (data[’doc_id’], json.dumps(data))

最后我們可以使用curl進行查詢

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*{ '_index' : 'walker', '_type' : 'apache', '_id' : '227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2', '_score' : 1.0, '_source' : { 'date' : '17/May/2015:10:05:32 +0000', 'ip' : '91.177.205.119', 'operation' : 'GET', 'doc_id' : '227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2', 'uri' : '/favicon.ico' }

如下是所有代碼:

import jsonimport hashlibimport redef addId(data): j=json.dumps(data).encode(’ascii’, ’ignore’) data[’doc_id’] = hashlib.sha224(j).hexdigest() return (data[’doc_id’], json.dumps(data))def parse(str): s=p.match(str) d = {} d[’ip’]=s.group(1) d[’date’]=s.group(4) d[’operation’]=s.group(5) d[’uri’]=s.group(6) return d regex=’^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] '(S+)s?(S+)?s?(S+)?' (d{3}|-) (d+|-)s?'?([^']*)'?s?'?([^']*)?'?$’p=re.compile(regex)rdd = sc.textFile('/home/ubuntu/walker/apache_logs')rdd2 = rdd.map(parse)rdd3 = rdd2.map(addID)es_write_conf = { 'es.nodes' : 'localhost', 'es.port' : '9200', 'es.resource' : ’walker/apache’, 'es.input.json': 'yes', 'es.mapping.id': 'doc_id' } rdd3.saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf)

也可以這么封裝,其實原理是一樣的

import hashlibimport jsonfrom pyspark import Sparkcontextdef make_md5(line): md5_obj=hashlib.md5() md5_obj.encode(line) return md5_obj.hexdigest()def parse(line): dic={} l = line.split(’t’) doc_id=make_md5(line) dic[’name’]=l[1] dic[’age’] =l[2] dic[’doc_id’]=doc_id return dic #記得這邊返回的是字典類型的,在寫入es之前要記得dumpsdef saveData2es(pdd, es_host, port,index, index_type, key): ''' 把saprk的運行結果寫入es :param pdd: 一個rdd類型的數據 :param es_host: 要寫es的ip :param index: 要寫入數據的索引 :param index_type: 索引的類型 :param key: 指定文檔的id,就是要以文檔的那個字段作為_id :return: ''' #實例es客戶端記得單例模式 if es.exist.index(index): es.index.create(index, ’spo’) es_write_conf = { 'es.nodes': es_host, 'es.port': port, 'es.resource': index/index_type, 'es.input.json': 'yes', 'es.mapping.id': key } (pdd.map(lambda _dic: (’’, json.dumps(_dic)))) #這百年是為把這個數據構造成元組格式,如果傳進來的_dic是字典則需要jdumps,如果傳進來之前就已經dumps,這便就不需要dumps了 .saveAsNewAPIHadoopFile( path=’-’, outputFormatClass='org.elasticsearch.hadoop.mr.EsOutputFormat', keyClass='org.apache.hadoop.io.NullWritable', valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable', conf=es_write_conf) )if __name__ == ’__main__’: #實例化sp對象 sc=Sparkcontext() #文件中的呢內容一行一行用sc的讀取出來 json_text=sc.textFile(’./1.txt’) #進行轉換 json_data=json_text.map(lambda line:parse(line)) saveData2es(json_data,’127.0.01’,’9200’,’index_test’,’index_type’,’doc_id’) sc.stop()

看到了把,面那個例子在寫入es之前加了一個id,返回一個元組格式的,現在這個封裝指定_id就會比較靈活了

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 电动葫芦|环链电动葫芦-北京凌鹰名优起重葫芦 | 西门子伺服电机维修,西门子电源模块维修,西门子驱动模块维修-上海渠利 | 双相钢_双相不锈钢_双相钢圆钢棒_双相不锈钢报价「海新双相钢」 双能x射线骨密度检测仪_dxa骨密度仪_双能x线骨密度仪_品牌厂家【品源医疗】 | 上海小程序开发-小程序制作-上海小程序定制开发公司-微信商城小程序-上海咏熠 | 防爆鼓风机-全风-宏丰鼓风机-上海梁瑾机电设备有限公司 | 恒温恒湿试验箱_高低温试验箱_恒温恒湿箱-东莞市高天试验设备有限公司 | 等离子空气净化器_医用空气消毒机_空气净化消毒机_中央家用新风系统厂家_利安达官网 | 机器视觉检测系统-视觉检测系统-机器视觉系统-ccd检测系统-视觉控制器-视控一体机 -海克易邦 | 防水套管_柔性防水套管_刚性防水套管-巩义市润达管道设备制造有限公司 | 井式炉-台车式回火炉-丹阳市电炉厂有限公司 | 广东护栏厂家-广州护栏网厂家-广东省安麦斯交通设施有限公司 | 房间温控器|LonWorks|海思| 马尔表面粗糙度仪-MAHR-T500Hommel-Mitutoyo粗糙度仪-笃挚仪器 | 柔软云母板-硬质-水位计云母片组件-首页-武汉长丰云母绝缘材料有限公司 | 浙江上沪阀门有限公司| 选矿设备-新型重选设备-金属矿尾矿重选-青州冠诚重工机械有限公司 | IIS7站长之家-站长工具-爱网站请使用IIS7站长综合查询工具,中国站长【WWW.IIS7.COM】 | 防水套管|柔性防水套管|伸缩器|伸缩接头|传力接头-河南伟创管道 防水套管_柔性防水套管_刚性防水套管-巩义市润达管道设备制造有限公司 | 权威废金属|废塑料|废纸|废铜|废钢价格|再生资源回收行情报价中心-中废网 | 苏州注册公司_苏州代理记账_苏州工商注册_苏州代办公司-恒佳财税 | 植筋胶-粘钢胶-碳纤维布-碳纤维板-环氧砂浆-加固材料生产厂家-上海巧力建筑科技有限公司 | 散热器厂家_暖气片_米德尔顿散热器 | 武汉天安盾电子设备有限公司 - 安盾安检,武汉安检门,武汉安检机,武汉金属探测器,武汉测温安检门,武汉X光行李安检机,武汉防爆罐,武汉车底安全检查,武汉液体探测仪,武汉安检防爆设备 | 东莞市天进机械有限公司-钉箱机-粘箱机-糊箱机-打钉机认准东莞天进机械-厂家直供更放心! | 乐考网-银行从业_基金从业资格考试_初级/中级会计报名时间_中级经济师 | 硅胶制品-硅橡胶制品-东莞硅胶制品厂家-广东帝博科技有限公司 | 杭州实验室尾气处理_实验台_实验室家具_杭州秋叶实验设备有限公司 | 全自动五线打端沾锡机,全自动裁线剥皮双头沾锡机,全自动尼龙扎带机-东莞市海文能机械设备有限公司 | 全自动固相萃取仪_高通量真空平行浓缩仪-勤业永为 | 超声波电磁流量计-液位计-孔板流量计-料位计-江苏信仪自动化仪表有限公司 | 山东成考网-山东成人高考网 | 深圳富泰鑫五金_五金冲压件加工_五金配件加工_精密零件加工厂 | 冷镦机-多工位冷镦机-高速冷镦机厂家-温州金诺机械设备制造有限公司 | MES系统工业智能终端_生产管理看板/安灯/ESOP/静电监控_讯鹏科技 | 环氧铁红防锈漆_环氧漆_无溶剂环氧涂料_环氧防腐漆-华川涂料 | 成都竞价托管_抖音代运营_网站建设_成都SEM外包-成都智网创联网络科技有限公司 | 手术室净化厂家_成都实验室装修公司_无尘车间施工单位_洁净室工程建设团队-四川华锐16年行业经验 | 陶瓷砂磨机,盘式砂磨机,棒销式砂磨机-无锡市少宏粉体科技有限公司 | 集装箱箱号识别_自重载重图像识别_铁路车号自动识别_OCR图像识别 | 通风气楼_通风天窗_屋顶风机-山东美创通风设备有限公司 | 执业药师报名条件,考试时间,考试真题,报名入口—首页 |