首頁技術文章正文

Java培訓:基于XXL-JOB實現(xiàn)分布式任務調(diào)度的實現(xiàn)

更新時間:2022-08-26 來源:黑馬程序員 瀏覽量:

  背景

  筆者以前在電商公司,我們需要在8月18號做大促活動,我們會提前一天給所有的用戶推送活動信息,且需要根據(jù)用戶畫像生成不同的推送內(nèi)容。

  當時我們總共有80萬用戶左右。

  經(jīng)測試,通過Spring Task和分布式鎖,單臺機器同時開啟5個線程,執(zhí)行時間需要27個小時左右,即便開10個線程,需要14個小時左右,顯然執(zhí)行時間過長。

  解決方案

  當時個推服務部署節(jié)點有3臺,在每年大促期間可動態(tài)擴容,其余的機器資源沒有充分利用起來。

  要想短時間內(nèi)完成推送,那么就得想辦法讓每臺機器各自分一部分用戶數(shù)據(jù)去執(zhí)行,這樣效率可提高原來的N倍。

  那么就需要分布式任務去執(zhí)行,核心思想如下圖:

1661479930520_1.jpg

  經(jīng)過調(diào)研現(xiàn)有的開源的分布式任務調(diào)度框架,決定在elastic-job和xxl-job中選一個

  [Elastic Job](https://github.com/elasticjob)是當當網(wǎng)開源一個分布式調(diào)度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成;定位為輕量級無中心化解決方案,使用 jar 包的形式提供分布式任務的協(xié)調(diào)服務。支持分布式調(diào)度協(xié)調(diào)、彈性擴容縮容、失效轉(zhuǎn)移、錯過執(zhí)行作業(yè)重觸發(fā)、并行調(diào)度、自診斷和修復等等功能特性。

  [XXL-Job官網(wǎng)](https://github.com/xuxueli/xxl-job)是大眾點評發(fā)布的分布式任務調(diào)度平臺,其核心設計目標是開發(fā)迅速、學習簡單、輕量級、易擴展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。

  更傾向于選擇XXL-JOB:

  1. 輕量級,支持通過Web頁面對任務進行動態(tài)CRUD操作,操作簡單

  2. 只依賴數(shù)據(jù)庫作為集群注冊中心,接入開發(fā)簡單,不需要ZK

  3. 高可用、解耦、高性能、監(jiān)控報警、分片、重試、故障轉(zhuǎn)移

  4. 團隊持續(xù)開發(fā),社區(qū)活躍

  5. 支持后臺直接查看每個任務執(zhí)行實時日志

  具體實現(xiàn)

  在項目中集成xxl-job客戶端

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.2.0</version>
</dependency>

  在配置文件中配置xxl-job信息

xxl:
  job:
    accessToken:
    admin:
      addresses: http://xxl部署IP地址:8080/xxl-job-admin
    executor:
      appname: vm-service
      address:
      ip:
      port: 9989
      logretentiondays: 30

  新增XxlJobConfig.java

package com.itheima.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
@Slf4j
public class XxlJobConfig {


    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

//    @Value("${xxl.job.executor.logpath}")
//    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        //xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}

  在xxl-job中新增執(zhí)行器

  注冊方式選自動注冊,這樣方便動態(tài)擴容

1661480124531_2.jpg

  創(chuàng)建任務

  路由策略選擇分片廣播

1661480137880_3.jpg

  代碼部分

  在任務代碼獲取推送用戶時,根據(jù)當前的分片及分片總數(shù)對用戶ID取余,這樣我們就可以在每個分片節(jié)點,獲取不一樣的數(shù)據(jù)。id值越連續(xù),分片則越均勻。

 ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
 int numbers = shardingVo.getTotal();  //分片總數(shù)
 int index = shardingVo.getIndex(); //當前分片索引

  假設分片總數(shù)為3,當前節(jié)點獲取到的分片索引為0,那么查詢推送用戶SQL如下:

SELECT user_id FROM `user_info` WHERE MOD(user_id,3)=0

  注意:我們在實際代碼中,分片總數(shù)和當前分片索引是以參數(shù)的形式傳給查詢的SQL語句的。

  如上,即可完成分布式任務。

  總結

  在某些定時任務需要處理大量數(shù)據(jù)的情況下,我們可以通過引入分布式任務框架xxl-job,充分利用機器資源,將需要處理的數(shù)據(jù)均勻的分配到不同的機器上去執(zhí)行,提高任務執(zhí)行效率。

分享到:
在線咨詢 我要報名
和我們在線交談!