电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

詳解批處理框架之Spring Batch

瀏覽:2日期:2023-07-07 13:48:36
目錄一、Spring Batch的概念知識1.1、分層架構1.2、關鍵概念1.2.1、JobRepository1.2.2、任務啟動器JobLauncher1.2.3、任務Job1.2.4、步驟Step1.2.5、輸入——處理——輸出二、代碼實例2.1、基本框架2.2、輸入——處理——輸出2.2.1、讀取ItemReader2.2.2、處理ItemProcessor2.2.3、輸出ItremWriter2.3、Step2.4、Job2.5、運行三、監聽Listener一、Spring Batch的概念知識1.1、分層架構

Spring Batch的分層架構圖如下:

詳解批處理框架之Spring Batch

可以看到它分為三層,分別是:

Application應用層:包含了所有任務batch jobs和開發人員自定義的代碼,主要是根據項目需要開發的業務流程等。 Batch Core核心層:包含啟動和管理任務的運行環境類,如JobLauncher等。 Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader和寫出writer、重試框架等。1.2、關鍵概念

理解下圖所涉及的概念至關重要,不然很難進行后續開發和問題分析。

詳解批處理框架之Spring Batch

1.2.1、JobRepository

專門負責與數據庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch是需要依賴數據庫來管理的。

1.2.2、任務啟動器JobLauncher

負責啟動任務Job。

1.2.3、任務Job

Job是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內容。

詳解批處理框架之Spring Batch

上圖介紹了Job的一些相關概念:

Job:封裝處理實體,定義過程邏輯。 JobInstance:Job的運行實例,不同的實例,參數不同,所以定義好一個Job后可以通過不同參數運行多次。 JobParameters:與JobInstance相關聯的參數。 JobExecution:代表Job的一次實際執行,可能成功、可能失敗。

所以,開發人員要做的事情,就是定義Job。

1.2.4、步驟Step

Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執行,才代表Job執行完成。

詳解批處理框架之Spring Batch

通過定義Step來組裝Job可以更靈活地實現復雜的業務邏輯。

1.2.5、輸入——處理——輸出

所以,定義一個Job關鍵是定義好一個或多個Step,然后把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數據,然后通過Item Processor進行業務處理和數據轉換,最后通過Item Writer寫到數據庫中去。

Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。

二、代碼實例

理解了基本概念后,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv文件中讀數據,處理后輸出到一個csv文件。

2.1、基本框架

添加依賴:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope></dependency>

需要添加Spring Batch的依賴,同時使用H2作為內存數據庫比較方便,實際生產肯定是要使用外部的數據庫,如Oracle、PostgreSQL。

入口主類:

@SpringBootApplication@EnableBatchProcessingpublic class PkslowBatchJobMain { public static void main(String[] args) {SpringApplication.run(PkslowBatchJobMain.class, args); }}

也很簡單,只是在Springboot的基礎上添加注解@EnableBatchProcessing。

領域實體類Employee:

package com.pkslow.batch.entity;public class Employee { String id; String firstName; String lastName;}

對應的csv文件內容如下:

id,firstName,lastName

1,Lokesh,Gupta

2,Amit,Mishra

3,Pankaj,Kumar

4,David,Miller

2.2、輸入——處理——輸出2.2.1、讀取ItemReader

因為有多個輸入文件,所以定義如下:

@Value('input/inputData*.csv')private Resource[] inputResources;@Beanpublic MultiResourceItemReader<Employee> multiResourceItemReader(){ MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader;}@Beanpublic FlatFileItemReader<Employee> reader(){ FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳過csv文件第一行,為表頭 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() {{ //字段名 setNames(new String[] { 'id', 'firstName', 'lastName' });} }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {{ //轉換化后的目標類 setTargetType(Employee.class);} }); } }); return reader;}

這里使用了FlatFileItemReader,方便我們從文件讀取數據。

2.2.2、處理ItemProcessor

為了簡單演示,處理很簡單,就是把最后一列轉為大寫:

public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; };}2.2.3、輸出ItremWriter

比較簡單,代碼及注釋如下:

private Resource outputResource = new FileSystemResource('output/outputData.csv');@Beanpublic FlatFileItemWriter<Employee> writer(){ FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否為追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //設置分割符 setDelimiter(','); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {{ //設置字段 setNames(new String[] { 'id', 'firstName', 'lastName' });} }); } }); return writer;}2.3、Step

有了Reader-Processor-Writer后,就可以定義Step了:

@Beanpublic Step csvStep() { return stepBuilderFactory.get('csvStep').<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build();}

這里有一個chunk的設置,值為5,意思是5條記錄后再提交輸出,可以根據自己需求定義。

2.4、Job

完成了Step的編碼,定義Job就容易了:

@Beanpublic Job pkslowCsvJob() { return jobBuilderFactory .get('pkslowCsvJob') .incrementer(new RunIdIncrementer()) .start(csvStep()) .build();}2.5、運行

完成以上編碼后,執行程序,結果如下:

詳解批處理框架之Spring Batch

成功讀取數據,并將最后字段轉為大寫,并輸出到outputData.csv文件。

三、監聽Listener

可以通過Listener接口對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數據等。

我們分別對Read、Process和Write事件進行監聽,對應分別要實現ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因為代碼比較簡單,就是打印一下日志,這里只貼出ItemWriteListener的實現代碼:

public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) {logger.info('beforeWrite: ' + list); } @Override public void afterWrite(List<? extends Employee> list) {logger.info('afterWrite: ' + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) {logger.info('onWriteError: ' + list); }}

把實現的監聽器listener整合到Step中去:

@Beanpublic Step csvStep() { return stepBuilderFactory.get('csvStep').<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build();}

執行后看一下日志:

詳解批處理框架之Spring Batch

這里就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。

以上就是詳解Spring Batch入門之優秀的批處理框架的詳細內容,更多關于Spring Batch 批處理框架的資料請關注好吧啦網其它相關文章!

標簽: Spring
相關文章:
主站蜘蛛池模板: 合肥钣金加工-安徽激光切割加工-机箱机柜加工厂家-合肥通快 | 老房子翻新装修,旧房墙面翻新,房屋防水补漏,厨房卫生间改造,室内装潢装修公司 - 一修房屋快修官网 | 上海单片机培训|重庆曙海培训分支机构—CortexM3+uC/OS培训班,北京linux培训,Windows驱动开发培训|上海IC版图设计,西安linux培训,北京汽车电子EMC培训,ARM培训,MTK培训,Android培训 | 缓蚀除垢剂_循环水阻垢剂_反渗透锅炉阻垢剂_有机硫化物-郑州威大水处理材料有限公司 | 昊宇水工|河北昊宇水工机械工程有限公司 | 淄博不锈钢,淄博不锈钢管,淄博不锈钢板-山东振远合金科技有限公司 | BESWICK球阀,BESWICK接头,BURKERT膜片阀,美国SEL继电器-东莞市广联自动化科技有限公司 | 深圳APP开发_手机软件APP定制外包_小程序开发公司-来科信 | 钢格板|镀锌钢格板|热镀锌钢格板|格栅板|钢格板|钢格栅板|热浸锌钢格板|平台钢格板|镀锌钢格栅板|热镀锌钢格栅板|平台钢格栅板|不锈钢钢格栅板 - 专业钢格板厂家 | 钢板仓,大型钢板仓,钢板库,大型钢板库,粉煤灰钢板仓,螺旋钢板仓,螺旋卷板仓,骨料钢板仓 | 回转窑-水泥|石灰|冶金-巩义市瑞光金属制品有限责任公司 | 珠海网站建设_响应网站建设_珠海建站公司_珠海网站设计与制作_珠海网讯互联 | 搬运设备、起重设备、吊装设备—『龙海起重成套设备』 | 上海办公室设计_办公楼,写字楼装修_办公室装修公司-匠御设计 | 大功率金属激光焊接机价格_不锈钢汽车配件|光纤自动激光焊接机设备-东莞市正信激光科技有限公司 定制奶茶纸杯_定制豆浆杯_广东纸杯厂_[绿保佳]一家专业生产纸杯碗的厂家 | 单螺旋速冻机-双螺旋-流态化-隧道式-食品速冻机厂家-广州冰泉制冷 | CTP磁天平|小电容测量仪|阴阳极极化_双液系沸点测定仪|dsj电渗实验装置-南京桑力电子设备厂 | 精密冲床,高速冲床等冲压设备生产商-常州晋志德压力机厂 | 档案密集柜_手动密集柜_智能密集柜_内蒙古档案密集柜-盛隆柜业内蒙古密集柜直销中心 | 玉米加工设备,玉米深加工机械,玉米糁加工设备.玉米脱皮制糁机 华豫万通粮机 | 3d打印服务,3d打印汽车,三维扫描,硅胶复模,手板,快速模具,深圳市精速三维打印科技有限公司 | 电池高低温试验箱-气态冲击箱-双层电池防爆箱|简户百科 | 紫外可见光分光度计-紫外分光度计-分光光度仪-屹谱仪器制造(上海)有限公司 | 中式装修设计_室内中式装修_【云臻轩】中式设计机构 | 一氧化氮泄露报警器,二甲苯浓度超标报警器-郑州汇瑞埔电子技术有限公司 | 扒渣机厂家_扒渣机价格_矿用扒渣机_铣挖机_撬毛台车_襄阳永力通扒渣机公司 | 英国雷迪地下管线探测仪-雷迪RD8100管线仪-多功能数字听漏仪-北京迪瑞进创科技有限公司 | 恒温恒湿试验箱厂家-高低温试验箱维修价格_东莞环仪仪器_东莞环仪仪器 | 多功能干燥机,过滤洗涤干燥三合一设备-无锡市张华医药设备有限公司 | 电动球阀_不锈钢电动球阀_电动三通球阀_电动调节球阀_上海湖泉阀门有限公司 | 医用酒精_84消毒液_碘伏消毒液等医用消毒液-漓峰消毒官网 | 商秀—企业短视频代运营_抖音企业号托管 | 艾默生变频器,艾默生ct,变频器,ct驱动器,广州艾默生变频器,供水专用变频器,风机变频器,电梯变频器,艾默生变频器代理-广州市盟雄贸易有限公司官方网站-艾默生变频器应用解决方案服务商 | 磁力加热搅拌器-多工位|大功率|数显恒温磁力搅拌器-司乐仪器官网 | 直线模组_滚珠丝杆滑台_模组滑台厂家_万里疆科技 | 气弹簧定制-气动杆-可控气弹簧-不锈钢阻尼器-工业气弹簧-可调节气弹簧厂家-常州巨腾气弹簧供应商 | 专业的新乡振动筛厂家-振动筛品质保障-环保振动筛价格—新乡市德科筛分机械有限公司 | 深圳希玛林顺潮眼科医院(官网)│深圳眼科医院│医保定点│香港希玛林顺潮眼科中心连锁品牌 | 高压无油空压机_无油水润滑空压机_水润滑无油螺杆空压机_无油空压机厂家-科普柯超滤(广东)节能科技有限公司 | 广州监控安装公司_远程监控_安防弱电工程_无线wifi覆盖_泉威安防科技 | 北京发电车出租-发电机租赁公司-柴油发电机厂家 - 北京明旺盛安机电设备有限公司 |